Tag Archives: Amazon Athena

Field Notes: Gaining Insights into Labeling Jobs for Machine Learning

Post Syndicated from Michael Graumann original https://aws.amazon.com/blogs/architecture/field-notes-gaining-insights-into-labeling-jobs-for-machine-learning/

In an era where more and more data is generated, it becomes critical for businesses to derive value from it. With the help of supervised learning, it is possible to generate models to automatically make predictions or decisions by leveraging historical data. For example, image recognition for self-driving cars, predicting anomalies on X-rays, fraud detection in finance and more. With supervised learning, these models learn from labeled data. The success of those models is highly dependent on readily available, high quality labeled data.

However, you might encounter cases where a high percentage of your pre-existing data is unlabeled. In these situations, providing correct labeling to previously unlabeled data points would directly translate to higher model accuracy.

Amazon SageMaker Ground Truth helps you with exactly that. It lets you build highly accurate training datasets for machine learning quickly. SageMaker Ground Truth provides your labelers with built-in workflows and interfaces for common labeling tasks. This process could take several hours or more depending on the size of your unlabeled dataset, and you might have a need to track the progress easily, preferably in the form of a dashboard.

In this blogpost we show how to gain deep insights into the progress of labeling and the performance of the workers by using Amazon Athena and Amazon QuickSight. We use Amazon Athena former to set up several views with specific insights into the labeling progress. Finally we will reference these views in Amazon QuickSight to visualize the data in a dashboard.

This approach also works for combining multiple AWS services in general. AWS provides many building blocks than you can mix-and-match to create a unique, integrated solution with cohesive insights. In this blog post we use data produced by one service (Ground Truth), prepare it with another (Athena) and visualize with a third (QuickSight). The following diagram shows this architecture.

Solution Architecture

ML Solution Architecture

Mapping a JSON structure to a table structure

Ground Truth creates several directories in your Amazon S3 output path. These directories contain the results of your labeling job and other artifacts of the job. The top-level directory for a labeling job has the same name as your labeling job, while the output directories are placed inside it. We will create all insights from what SageMaker Ground Truth calls worker responses.

All respective JSON files reside in the path s3://bucket/<job-name>/annotations/worker-response/.

To analyze the labeling data with Amazon Athena we need to understand the structure of the underlying JSON files. Let’s review the example below. For each item that was labeled, we see the label itself, followed by the submission time and a workerId pointing to an identity. This identity lives in Amazon Cognito, a fully managed service that provides the user directory for our labelers.

{
    "answers": [
        {
            "answerContent": {
                "crowd-classifier": {
                    "label": "Compute"
                }
            },
            "submissionTime": "2020-03-27T10:31:04.210Z",
            "workerId": "private.eu-west-1.1111111111111111",
            "workerMetadata": {
                "identityData": {
                    "identityProviderType": "Cognito",
                    "issuer": "https://cognito-idp.eu-west-1.amazonaws.com/eu-west-1_111111111",
                    "sub": "11111111-1111-1111-1111-111111111111"
                }
            }
        },
        ...
    ]
}

Although the data is stored in Amazon S3 object storage, we are able to use SQL to access the data by using Amazon Athena. Since we now understand the JSON structure from shown in the preceding code, we use Athena and define how to interpret the data that is relevant to us. We do so by first creating a database using the Athena Query Editor:

CREATE DATABASE analyze_labels_db;

Once inside the database, we add the table schema. The actual files remain on Amazon S3, but using the metadata catalog, Athena then knows where the data lies and how to interpret it. The AWS Glue Data Catalog is a central repository to store structural and operational metadata for all your data assets. For a given dataset, you can store its table definition, physical location, add business relevant attributes, in addition to track how this data has changed over time. Besides, Athena the AWS Glue Data Catalog also provides out-of-box integration with Amazon EMR and Amazon Redshift Spectrum. Once you add your table definitions to the Glue Data Catalog, they are available for ETL. They are also readily available for querying in Amazon Athena, Amazon EMR, and Amazon Redshift Spectrum so that you can have a common view of your data between these services.

When going from JSON to SQL, we are crossing format boundaries. To further facilitate how to read the JSON formatted data we are using SerDe Properties to replace the hyphen in crowd-classifier with an underscore due to DDL constraints. Finally we point the location to our Amazon S3 bucket containing the single worker responses. Recognize in the following script that we translate the nested structure of the JSON file itself into a hierarchical, nested data structure in the schema definition. Also, we could leave out the workerMetadata as we don’t need it at this time. The data would still stay in the files on Amazon S3, so that we could later change and add the workerMetadata STRUCT into the table definition for our analysis.

CREATE EXTERNAL TABLE annotations_raw (
  answers array<
    struct<answercontent: 
      struct<crowd_classifier: 
        struct<label: string>
      >,
      submissionTime: string,
      workerId: string,
      workerMetadata: 
        struct<identityData: 
          struct<identityProviderType: string, issuer: string, sub: string>
        >
    >
  >
) 
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  "mapping.crowd_classifier"="crowd-classifier" 
) 
LOCATION 's3://<YOUR_BUCKET>/<JOB_NAME>/annotations/worker-response/'

Creating Views in Athena

Now, we have nested data in our annotations_raw table. For many use cases, especially for analytical uses, representing data in a tabular fashion—as rows—is more natural. This is also the standard way when using SQL and business intelligence tools. To unnest the hierarchical data into flattened rows, we create the following view which will serve as foundation for the other views we create. For an in-depth look into unnesting data with Amazon Athena, read this blog post.

Some of the information we’re interested in might not be part of the document, but is encoded in the path. We use a trick in Athena by using the $path variable from the Presto Hive Connector. This determines which Amazon S3 file contains data that is returned by a specific row in an Athena table. This way we can find out which data object an annotation belongs to. Since Athena is built on top of Presto, we are able to use Presto’s built-in regexp_extract function to find out the iteration as well as the data object id per labeling result. We also cast the submission time in date format to later determine the labeling progress per day.

CREATE OR REPLACE VIEW annotations_view AS
SELECT 
  regexp_extract("$path", 'iteration-[0-9]*') as iteration,
  regexp_extract("$path", '(iteration-[0-9]*\/([0-9]*))',2) as dataRecord,
  answer.answercontent.crowd_classifier.label,
  cast(from_iso8601_timestamp(answer.submissionTime) as timestamp) as submissionTime,
  cast(from_iso8601_timestamp(answer.submissionTime) as date) as submissionDay,
  answer.workerId,
  answer.workerMetadata.identityData.identityProviderType,
  answer.workerMetadata.identityData.issuer,
  answer.workerMetadata.identityData.sub,
  "$path" path
FROM 
  annotations_raw
CROSS JOIN UNNEST(answers) AS t(answer)

This view, annotations_view, will be the starting point for the other views we will be creating in further in this post.

Visualizing with QuickSight

In this section, we explore a way to visualize the views we build in Athena by pointing Amazon QuickSight to the respective view. Amazon QuickSight lets you create and publish interactive dashboards that include ML Insights. Dashboards can then be accessed from any device, and embedded into your applications, portals, and websites.

Thanks to the tight integration between Athena and QuickSight, we are able to map one dataset in QuickSight to one Athena view. In order to further optimize the performance of the dashboard, we can optionally import the datasets into the in-memory optimized calculation engine for Amazon QuickSight called SPICE. With the datasets in place we can now create an analysis in order to interact with the visuals we’re going to add. You can think of an analysis as a container for a set of related visuals. You can use multiple datasets in an analysis, although any given visual can only use one of those datasets. After you create an analysis and an initial visual, you can expand the analysis. You can do this for example by adding datasets and visuals.

Let’s start with our first insight.

Annotations per worker

We’d like to gain insights not only into the total number of labeled items but also on the level of contributions of each individual workers. This could give us an indication whether the labels were created by a diverse crowd of labelers or by a few productive ones. A largely disproportionate amount of contributions from a handful of workers who may have brought along their biases.

SageMaker Ground Truth calls labeled data objects annotations, which is the result of a single workers labeling task.

Luckily we encapsulated all the heavy lifting of format conversion in the annotations_view, so that it is now easy to create a view for the annotations per user:

CREATE OR REPLACE VIEW annotations_per_user AS
SELECT COUNT(sub) AS LabeledItems,
sub AS User
FROM annotations_view
GROUP BY sub
ORDER BY LabeledItems DESC

Next we visualize this view in QuickSight. We add a visual to our analysis, select the respective dataset for the view and use the AutoGraph feature, which chooses the most appropriate visual type. Since we already arranged our view in Athena by the number of labeled items in descending order, there is no need now to sort the data in QuickSight. In the following screenshot, worker c4ef78e4... contributed more labels compared to their peers.

Annotations per worker

This view gives you an indicator to check for a bias that the leading worker might have brought along.

Annotations per label

One thing we want to be aware of is potential imbalances between classes in our dataset. Especially simple machine learning models, which may learn to frequently predict a label that is massively over represented in the dataset. If we can identify an imbalance, we can apply mitigation actions such as upsampling data of underrepresented classes. With the following view we list the total number of annotations per label.

CREATE OR REPLACE VIEW annotations_per_label AS
SELECT Count(dataRecord) AS TotalLabels, label As Label 
FROM annotations_view
GROUP BY label
ORDER BY TotalLabels DESC, Label;

As before, we create a dataset in QuickSight pointing to the annotations_per_label view, open the analysis, add a new visual and leverage the AutoGraph functionality. The result is the following visual representation:

Annotations per worker 2

One can clearly see that the Analytics & AI/ML class is massively underrepresented. At this point, you might want to try getting more data or think about upsampling data for that class.

Annotations per day

Seeing the total number of annotations per label and per worker is good, but we are also interested in how the labeling progress changes over time. This way we might see spikes related to labeller activations. We can also or estimate how long it takes to reach a certain goal of annotations given the current pace. For this purpose we create the following view aggregating the total annotations per day.

CREATE OR REPLACE VIEW annotations_per_day AS
SELECT COUNT(datarecord) AS LabeledItems,
submissionDay
FROM annotations_view
GROUP BY submissionDay
ORDER BY submissionDay, LabeledItems DESC

This time the QuickSight AutoGraph provides us with the following line chart. You might have noticed that the axis labels do not match the column names in Athena. That is because we renamed them in QuickSight for better readability.

Total annotations per day

In the preceding chart we see that there is no consistent pace of labeling, which makes it hard to predict when a certain amount of labeled data will be reached. In this example, after starting strong the progress immediately went down. Knowing this, we might want to take action into motivating our workers to contribute more and validate the effectiveness of these actions with the help of this chart. The spikes indicate an effective short-term action.

Distribution of total annotations by user

We already have insights into annotations per worker, per label and per day. Let us now now see what insights we can get from aggregating some of this information.

The bigger your labeling workforce gets, the harder it can become to see the whole picture. For that reason we will now create a histogram consisting of five buckets. Each bucket represents an interval of total annotations (for example, 0-25 annotations) mapped to the number of users whose amount of total annotations lies in that interval. This allows us to get a sense of what kind of bias might be introduced by the majority of annotations being contributed by a small amount of workers.

To do that, we use the Presto function width_bucket which returns the number of labeled data objects according to the five buckets we defined with a size of 25 each. We define these buckets by creating an Array with 5 elements that specify the boundaries.

CREATE OR REPLACE VIEW users_per_bucket_annotations AS
SELECT 
bucket,numberOfUsers,
CASE
   WHEN bucket=5 THEN 'B' || cast(bucket AS VARCHAR(10)) || ': ' || cast(((bucket-1) * 25) AS VARCHAR(10)) || '+'
   ELSE 'B' || cast(bucket AS VARCHAR(10)) || ': ' || cast(((bucket-1) * 25) AS VARCHAR(10)) || '-' || cast((bucket * 25) AS VARCHAR(10))
END AS NumberOfAnnotations
FROM
(SELECT width_bucket(labeleditems,ARRAY[0,25,50,75,100]) AS bucket,
 count(user) AS numberOfUsers
FROM annotations_per_user
GROUP BY 1
ORDER BY bucket)

A SELECT * FROM users_per_bucket_annotations produces the following result:

A SELECT FROM users_per_bucket_annotations

Let’s now investigate the same data via QuickSight:

Annotations per User in buckets of Size 25

Now that we can look at the data visually it becomes clear that we have a bimodal distribution, with many labelers having done very little, and many labelers doing quite a lot. This may warrant interviewing some labelers to find out if there is something holding back users from progressing, or if we can keep engagement high over time.

Putting it all together in QuickSight

Since we created all previous visuals into one analysis, we can now utilize it as a central place to consume our insights in a user-friendly way. Moreover, we can share our insights with others as a read-only snapshot which QuickSight calls a dashboard. User who are dashboard viewers can view and filter the dashboard data as below:

Groundtruth dashboard

Furthermore, you can generate a report and let QuickSight send it either once or on a schedule (daily, weekly or monthly) to your peers. This way users do not have to sign in and they can get reminders to check the progress of the labeling job. Lastly, sending out those reports is an opportunity to stay in touch with the labelers and keep the engagement high.

Conclusion

In this blogpost, we have shown one example of combining multiple AWS services in order to build a solution tailored to your needs. We took the Amazon S3 output generated by SageMaker Ground Truth and showed how it can be further processed and analyzed with Athena. Finally, we created a central place to consume our insights in a user-friendly way with QuickSight. By putting it all together in a dashboard we were able to share our insights with our peers.

You can take the same pattern and apply it to other situations: take some of the many building blocks AWS provides and mix-and-match them to create a unique, integrated solution with cohesive insights just as we did with Ground Truth, Athena, and QuickSight.

Field Notes provides hands-on technical guidance from AWS Solutions Architects, consultants, and technical account managers, based on their experiences in the field solving real-world business problems for customers.

Automating bucketing of streaming data using Amazon Athena and AWS Lambda

Post Syndicated from Ahmed Saef Zamzam original https://aws.amazon.com/blogs/big-data/automating-bucketing-of-streaming-data-using-amazon-athena-and-aws-lambda/

In today’s world, data plays a vital role in helping businesses understand and improve their processes and services to reduce cost. You can use several tools to gain insights from your data, such as Amazon Kinesis Data Analytics or open-source frameworks like Structured Streaming and Apache Flink to analyze the data in real time. Alternatively, you can batch analyze the data by ingesting it into a centralized storage known as a data lake. Data lakes allow you to import any amount of data that can come in real time or batch. With Amazon Simple Storage Service (Amazon S3), you can cost-effectively build and scale a data lake of any size in a secure environment where data is protected by 99.999999999% (11 9s) of durability.

After the data lands in your data lake, you can start processing this data using any Big Data processing tool of your choice. Amazon Athena is a fully managed interactive query service that enables you to analyze data stored in an Amazon S3-based data lake using standard SQL. You can also integrate Athena with Amazon QuickSight for easy visualization of the data.

When working with Athena, you can employ a few best practices to reduce cost and improve performance. Converting to columnar formats, partitioning, and bucketing your data are some of the best practices outlined in Top 10 Performance Tuning Tips for Amazon Athena. Bucketing is a technique that groups data based on specific columns together within a single partition. These columns are known as bucket keys. By grouping related data together into a single bucket (a file within a partition), you significantly reduce the amount of data scanned by Athena, thus improving query performance and reducing cost. For example, imagine collecting and storing clickstream data. If you frequently filter or aggregate by user ID, then within a single partition it’s better to store all rows for the same user together. If user data isn’t stored together, then Athena has to scan multiple files to retrieve the user’s records. This leads to more files being scanned, and therefore, an increase in query runtime and cost.

Like partitioning, columns that are frequently used to filter the data are good candidates for bucketing. However, unlike partitioning, with bucketing it’s better to use columns with high cardinality as a bucketing key. For example, Year and Month columns are good candidates for partition keys, whereas userID and sensorID are good examples of bucket keys. By doing this, you make sure that all buckets have a similar number of rows. For more information, see Bucketing vs Partitioning.

For real-time data (such as data coming from sensors or clickstream data), streaming tools like Amazon Kinesis Data Firehose can convert the data to columnar formats and partition it while writing to Amazon S3. With Kafka, you can do the same thing with connectors. But what about bucketing? This post shows how to continuously bucket streaming data using AWS Lambda and Athena.

Overview of solution

The following diagram shows the high-level architecture of the solution.

The architecture includes the following steps:

  1. We use the Amazon Kinesis Data Generator (KDG) to simulate streaming data. Data is then written into Kinesis Data Firehose; a fully managed service that enables you to load streaming data to an Amazon S3-based data lake.
  2. Kinesis Data Firehose partitions the data by hour and writes new JSON files into the current partition in a /raw Each new partition looks like /raw/dt=<YYYY-MM-dd-HH>. Every hour, a new partition is created.
  3. Two Lambda functions are triggered on an hourly basis based on Amazon CloudWatch Events.
    • Function 1 (LoadPartition) runs every hour to load new /raw partitions to Athena SourceTable, which points to the /raw prefix.
    • Function 2 (Bucketing) runs the Athena CREATE TABLE AS SELECT (CTAS) query.
  4. The CTAS query copies the previous hour’s data from /raw to /curated and buckets the data while doing so. It loads the new data as a new partition to TargetTable, which points to the /curated prefix.

Overview of walkthrough

In this post, we cover the following high-level steps:

  1. Install and configure the KDG.
  2. Create a Kinesis Data Firehose delivery stream.
  3. Create the database and tables in Athena.
  4. Create the Lambda functions and schedule them.
  5. Test the solution.
  6. Create view that the combines data from both tables.
  7. Clean up.

Installing and configuring the KDG

First, we need to install and configure the KDG in our AWS account. To do this, we use the following AWS CloudFormation template.

For more information about installing the KDG, see the KDG Guide in GitHub.

To configure the KDG, complete the following steps:

  1. On the AWS CloudFormation console, locate the stack you just created.
  2. On the Outputs tab, record the value for KinesisDataGeneratorUrl.
  3. Log in to the KDG main page using the credentials created when you deployed the CloudFormation template.
  4. In the Record template section, enter the following template. Each record has three fields: sensorID, currentTemperature, and status.
    {
        "sensorId": {{random.number(4000)}},
        "currentTemperature": {{random.number(
            {
                "min":10,
                "max":50
            }
        )}},
        "status": "{{random.arrayElement(
            ["OK","FAIL","WARN"]
        )}}"
    }
    

  5. Choose Test template.

The result should look like the following screenshot.

We don’t start sending data now; we do this after creating all other resources.

Creating a Kinesis Data Firehose delivery stream

Next, we create the Kinesis Data Firehose delivery stream that is used to load the data to the S3 bucket.

  1. On the Amazon Kinesis console, choose Kinesis Data Firehose.
  2. Choose Create delivery stream.
  3. For Delivery stream name, enter a name, such as AutoBucketingKDF.
  4. For Source, select Direct PUT or other sources.
  5. Leave all other settings at their default and choose Next.
  6. On Process Records page, leave everything at its default and choose Next.
  7. Choose Amazon S3 as the destination and choose your S3 bucket from the drop-down menu (or create a new one). For this post, I already have a bucket created.
  8. For S3 Prefix, enter the following prefix:
    raw/dt=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}-!{timestamp:HH}/

We use custom prefixes to tell Kinesis Data Firehose to create a new partition every hour. Each partition looks like this: dt=YYYY-MM-dd-HH. This partition-naming convention conforms to the Hive partition-naming convention, <PartitionKey>=<PartitionKey>. In this case, <PartitionKey> is dt and <PartitionValue> is YYYY-MM-dd-HH. By doing this, we implement a flat partitioning model instead of hierarchical (year=YYYY/month=MM/day=dd/hour=HH) partitions. This model can be much simpler for end-users to work with, and you can use a single column (dt) to filter the data. For more information on flat vs. hierarchal partitions, see Data Lake Storage Foundation on GitHub.

  1. For S3 error prefix, enter the following code:
    myFirehoseFailures/!{firehose:error-output-type}/

  2. On the Settings page, leave everything at its default.
  3. Choose Create delivery stream.

Creating an Athena database and tables

In this solution, the Athena database has two tables: SourceTable and TargetTable. Both tables have identical schemas and will have the same data eventually. However, each table points to a different S3 location. Moreover, because data is stored in different formats, Athena uses a different SerDe for each table to parse the data. SourceTable uses JSON SerDe and TargetTable uses Parquet SerDe. One other difference is that SourceTable’s data isn’t bucketed, whereas TargetTable’s data is bucketed.

In this step, we create both tables and the database that groups them.

  1. On the Athena console, create a new database by running the following statement:
    CREATE DATABASE mydatabase

  2. Choose the database that was created and run the following query to create SourceTable. Replace <s3_bucket_name> with the bucket name you used when creating the Kinesis Data Firehose delivery stream.
    CREATE EXTERNAL TABLE mydatabase.SourceTable(
      sensorid string, 
      currenttemperature int, 
      status string)
    PARTITIONED BY ( 
      dt string)
    ROW FORMAT SERDE 
      'org.openx.data.jsonserde.JsonSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
    LOCATION
      's3://<s3_bucket_name>/raw/'
    

  3. Run the following CTAS statement to create TargetTable:
    CREATE TABLE TargetTable
    WITH (
          format = 'PARQUET', 
          external_location = 's3://<s3_bucket_name>/curated/', 
          partitioned_by = ARRAY['dt'], 
          bucketed_by = ARRAY['sensorID'], 
          bucket_count = 3) 
    AS SELECT *
    FROM SourceTable

SourceTable doesn’t have any data yet. However, the preceding query creates the table definition in the Data Catalog. We configured this data to be bucketed by sensorID (bucketing key) with a bucket count of 3. Ideally, the number of buckets should be so that the files are of optimal size.

Creating Lambda functions

The solution has two Lambda functions: LoadPartiton and Bucketing. We use an AWS Serverless Application Model (AWS SAM) template to create, deploy, and schedule both functions.

Follow the instructions in the GitHub repo to deploy the template. When deploying the template, it asks you for some parameters. You can use the default parameters, but you have to change S3BucketName and AthenaResultLocation. For more information, see Parameter Details in the GitHub repo.

LoadPartition function

The LoadPartiton function is scheduled to run the first minute of every hour. Every time Kinesis Data Firehose creates a new partition in the /raw folder, this function loads the new partition to the SourceTable. This is crucial because the second function (Bucketing) reads this partition the following hour to copy the data to /curated.

Bucketing function

The Bucketing function is scheduled to run the first minute of every hour. It copies the last hour’s data from SourceTable to TargetTable. It does so by creating a tempTable using a CTAS query. This tempTable points to the new date-hour folder under /curated; this folder is then added as a single partition to TargetTable.

To implement this, the function runs three queries sequentially. The queries use two parameters:

  • <s3_bucket_name> – Defined by an AWS SAM parameter and should be the same bucket used throughout this solution
  • <last_hour_partition> – Is calculated by the function depending on which hour it’s running

The function first creates TempTable as the result of a SELECT statement from SourceTable. It stores the results in a new folder under /curated. The results are bucketed and stored in Parquet format. See the following code:

CREATE TABLE TempTable
    WITH (
      format = 'PARQUET', 
      external_location = 's3://<s3_bucket_name>/curated/dt=<last_hour_partition>/', 
      bucketed_by = ARRAY['sensorID'], 
      bucket_count = 3) 
    AS SELECT *
    FROM SourceTable
    WHERE dt='<last_hour_partiton>';

We create a new subfolder in /curated, which is new partition for TargetTable. So, after the TempTable creation is complete, we load the new partition to TargetTable:

ALTER TABLE TargetTable
                ADD IF NOT EXISTS
                PARTITION ('<last_hour_partiton>');

Finally, we delete tempTable from the Data Catalog:

DROP TABLE TempTable

Testing the solution

Now that we have created all resources, it’s time to test the solution. We start by generating data from the KDG and waiting for an hour to start querying data in TargetTable (the bucketed table).

  1. Log in to the KDG. You should find the template you created earlier. For the configuration, choose the following:
    1. The Region used.
    2. For the delivery stream, choose the Kinesis Data Firehose you created earlier.
    3. For records/sec, enter 3000.
  2. Choose Send data.

The KDG starts sending simulated data to Kinesis Data Firehose. After 1 minute, a new partition should be created in Amazon S3.

The Lambda function that loads the partition to SourceTable runs on the first minute of the hour. If you started sending data after the first minute, this partition is missed because the next run loads the next hour’s partition, not this one. To mitigate this, run MSCK REPAIR TABLE SourceTable only for the first hour.

  1. To benchmark the performance between both tables, wait for an hour so that the data is available for querying in TargetTable.
  2. When the data is available, choose one sensorID and run the following query on SourceTable and TargetTable.
    SELECT sensorID, avg(currenttemperature) as AverageTempreture 
    FROM <TableName>
    WHERE dt='<YYYY-MM-dd-HH>' AND sensorID ='<sensorID_selected>'
    GROUP BY 1

The following screenshot shows the query results for SourceTable. It shows the runtime in seconds and amount of data scanned.

The following screenshot shows the query results for TargetTable.

If you look at these results, you don’t see a huge difference in runtime for this specific query and dataset; for other datasets, this difference should be more significant. However, from a data scanning perspective, after bucketing the data, we reduced the data scanned by approximately 98%. Therefore, for this specific use case, bucketing the data lead to a 98% reduction in Athena costs because you’re charged based on the amount of data scanned by each query.

Querying the current hour’s data

Data for the current hour isn’t available immediately in TargetTable. It’s available for querying after the first minute of the following hour. To query this data immediately, we have to create a view that UNIONS the previous hour’s data from TargetTable with the current hour’s data from SourceTable. If data is required for analysis after an hour of its arrival, then you don’t need to create this view.

To create this view, run the following query in Athena:

CREATE OR REPLACE VIEW combined AS

SELECT *, "$path" AS file
FROM SourceTable
WHERE dt >= date_format(date_trunc('hour', (current_timestamp)), '%Y-%m-%d-%H')

UNION ALL 

SELECT *, "$path" AS file
FROM TargetTable
WHERE dt < date_format(date_trunc('hour', (current_timestamp)), '%Y-%m-%d-%H')

Cleaning up

Delete the resources you created if you no longer need them.

  1. Delete the Kinesis Data Firehose delivery stream.
  2. In Athena, run the following statements
    1. DROP DATABASE mydatabase
    2. DROP TABLE SourceTable
    3. DROP TABLE TargetTable
  3. Delete the AWS SAM template to delete the Lambda functions.
  4. Delete the CloudFormation stack for the KDG. For more information, see Deleting a stack on the AWS CloudFormation console.

Conclusion

Bucketing is a powerful technique and can significantly improve performance and reduce Athena costs. In this post, we saw how to continuously bucket streaming data using Lambda and Athena. We used a simulated dataset generated by Kinesis Data Generator. The same solution can apply to any production data, with the following changes:

  • DDL statements
  • Functions used can work with data that is partitioned by hour with the partition key ‘dt’ and partition value <YYYY-MM-dd-HH>. If your data is partitioned in a different way, edit the Lambda functions accordingly.
  • Frequency of Lambda triggers.

About the Author

Ahmed Zamzam is a Solutions Architect with Amazon Web Services. He supports SMB customers in the UK in their digital transformation and their cloud journey to AWS, and specializes in Data Analytics. Outside of work, he loves traveling, hiking, and cycling.

 

 

 

 

Configure and optimize performance of Amazon Athena federation with Amazon Redshift

Post Syndicated from Harsha Tadiparthi original https://aws.amazon.com/blogs/big-data/configure-and-optimize-performance-of-amazon-athena-federation-with-amazon-redshift/

This post provides guidance on how to configure Amazon Athena federation with AWS Lambda and Amazon Redshift, while addressing performance considerations to ensure proper use.

If you use data lakes in Amazon Simple Storage Service (Amazon S3) and use Amazon Redshift as your data warehouse, you may want to integrate the two for a lake house approach. Lake House is the ability to integrate Data Lake and Data warehouse seamlessly. When you need to query your data lake from your Amazon Redshift Data warehouse, you can use Amazon Redshift Spectrum, which works great in unifying your data lake and data warehouse. However, when you use Athena in the data lake and need to access data in Amazon Redshift for the following two scenarios which are commonly seen, there is no easy approach:

  • Team A has a data lake in Amazon S3 and uses Athena. They need access to the data in an Amazon Redshift cluster owned by Team B.
  • Analysts using Athena to query their data lake for analytics need agility and flexibility to access data in an Amazon Redshift data warehouse without moving the data to Amazon S3 Data Lake.

In these scenarios, Athena federation with Amazon Redshift allows you to seamlessly access the data in your Amazon Redshift data warehouse without having to wait to unload the data to the Amazon S3 data lake, which removes the overhead in managing such jobs.

In this post, you walk through a step-by-step configuration to set up Athena federation using Lambda to access data in Amazon Redshift. You also see a performance benchmark analysis of interactive and ad hoc TPC-DS queries, and learn some key performance considerations and best practices when using federation.

Solution overview

Data federation is the capability to integrate data in another data store using a single interface. The following diagram depicts how Athena federation works by using Lambda to integrate with a federated data source.

Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Simply point to your data in Amazon S3, define the schema, and start querying using standard SQL.

Lambda lets you run code without provisioning or managing servers. You can run code for virtually any type of application with zero administration and only pay for when the code is running.

Amazon Redshift is a petabyte-scale data warehouse designed from the ground up, natively for the cloud. Amazon Redshift is the most popular and fastest cloud data warehouse. It’s integrated with your data lake, offers performance up to three times faster than any other data warehouse, and costs up to 75% less than any other cloud data warehouse.

The following diagram depicts all the data source connectors available as of this writing in the AWS Serverless Application Repository.

The AWS Serverless Application Repository is a managed repository for serverless applications. It enables you to store and share reusable applications, and easily assemble and deploy serverless architectures in powerful new ways.

You can also create a custom connector for sources that aren’t in the AWS Serverless Application Repository.

Prerequisites

Before you get started, create a secret for the Amazon Redshift login ID and password using AWS Secrets Manager.

  1. On the Secrets Manager console, choose Secrets.
  2. Choose Store a new secret.
  3. Choose credentials for your Amazon Redshift cluster, and set your user name and password.
  4. Choose the cluster you want to use.
  5. For Secret name, enter a name for your secret. Use the prefix AthenaJDBCFederation so it’s easy to find.
  6. Leave the remaining fields at their defaults and choose Next.
  7. Complete your secret creation.

Setting up your S3 bucket

On the Amazon S3 console, create a new S3 bucket and subfolder for Lambda to use. For this post, use the name myworkspace0009/athenafederation.

Configuring Athena federation with Amazon Redshift

To configure Athena federation with Amazon Redshift, complete the following steps:

  1. On the AWS Serverless Application Repository, choose Available applications.
  2. In the search field, enter athena federation.

  1. Choose
  2. In the Application settings section, provide the following details:
  3. Application nameAthenaRedshiftConnector
  4. SecretNamePrefixAthenaJdbcFederation
  5. SpillBucketmyworkspace0009/athenafederation
  6. JDBCConnectorConfigRedshift://jdbc:Redshift://<YourAmazon Redshift1Hostname>:5439/<DBName>?user=sample2&password=sample2
  7. DisableSpillEncyption – False
  8. LambdaFunctionNamerstpcds30
  9. SecurityGroupID – Security group ID where Amazon Redshift is deployed
  10. SpillPrefix – Leave default
  11. Subnetids – Use the subnets where Amazon Redshift is running with comma separation
  12. Select the I acknowledge check box.
  13. Choose Deploy.

In the next steps, you configure an Amazon Virtual Private Cloud (Amazon VPC) endpoint for Amazon S3 to allow Lambda to write federated query results to Amazon S3.

  1. On the Amazon VPC console, choose Endpoints.
  2. Choose Create endpoint.
  3. Choose the VPC for your endpoint.

  1. Make any necessary security changes as per your security requirements.

  1. Choose Create endpoint.

Running federated queries with Athena

To start running federated queries, complete the following steps:

  1. On the Athena console, choose Workgroups.
  2. If you don’t see a workgroup called AmazonAthenaPreviewFunctionality, create one.

When this feature becomes generally available, you won’t need to use this workgroup name.

  1. Run your queries, using lambda:rstpcds30 to run against tables in Amazon Redshift.

Athena query performance comparison

Several customers have asked us for performance insights and prescriptive guidance on how queries in Athena compare against federated queries and how to use them. In this section, we use a TPC-DS 3 TB standard dataset and a select few queries that fall in the category of ad hoc and interactive. The comparison of their performance should give you an idea of what to expect when running federated queries against Amazon Redshift.

For the following tests, we used a 3 TB TPC-DS dataset in Amazon S3 data lake with Parquet compressed, partitioned and served by Athena, and the same 3 TB TPC-DS dataset on Amazon Redshift cluster running four RA3.4XL nodes.

The following table summarizes the dataset sizes:

DatasetTable Size (Records)
store_sales8.6 billion
customer30 million
customer_address15 million
customer_demographics1.92 million
item360,000
date_dim73,000
store1,350

We ran the following four tests:

  • T1 – Queries ran in Athena without federation. All table data is in Amazon S3.
  • T2 – Queries ran in Athena with federation to Amazon Redshift. All table data is in Amazon S3, except the store_sales fact table in Amazon Redshift.
  • T3 – Queries ran in Athena with federation to Amazon Redshift. All tables and data are in Redshift.
  • T4 – Queries ran in Amazon Redshift without federation. All tables and data are in Redshift.

The following graph represents the performance of some of the ad hoc and interactive TPC-DS queries.

In the preceding graph, all T3 queries timed out at 900 seconds, depicted by the pink reference line, due to the Lambda 900-second timeout limit. This is due to overhead from store_sales fact data that needed to be transferred back to Athena.

The following graph removes T3 from the visualization, which gives better visibility when comparing the other tests.

Notice the query performance between T1 and T2 that completed in almost the same time while T4 queries ran significantly faster.

Amazon Redshift beats the performance of Athena in providing extremely low latency and should be the tool of choice if you’re looking for very low SLAs for analytics queries that Athena can’t achieve.

The following graph shows the data scanned in Amazon S3 for T1 and T2, which outlines why there isn’t much difference in query performance when compared to federated queries.

For the T2 federated queries, a small amount of dimension data is filtered in Amazon Redshift and brought back to Athena, instead of scanning the entire dimension tables. This is a typical nature for several ad hoc and interactive queries.

The performance of these TPC-DS queries between T1 and T2 is comparable because very little data is transferred back to Athena. You can see a similar behavior in several ad hoc and interactive query use cases because they use limited dimensions and scan a small subset of dimension data. Due to the 900-second timeout for the Lambda instances that connect to Amazon Redshift, it’s advised to minimize the amount of data the query brings back. Although Athena uses multiple Lambda instances in parallel to run your federated query, it’s also important to make sure the Amazon Redshift WLM queue has enough slots to process it, thereby not leading to queue wait time. For example, in some of the preceding queries, 20 Lambda executions were connecting to Amazon Redshift concurrently.

Key performance best practice considerations

When considering Athena federation with Amazon Redshift, you could take into account the following best practices:

  • Athena federation works great for queries with predicate filtering because the predicates are pushed down to Amazon Redshift. Use filter and limited-range scans in your queries to avoid full table scans.
  • If your SQL query requires returning a large volume of data from Amazon Redshift to Athena (which could lead to query timeouts or slow performance), unload the large tables in your query from Redshift to your Amazon S3 data lake.
  • Star schema is a commonly used data model in Amazon Redshift. In the star schema model, unload your large fact tables into your data lake and leave the dimension tables in Amazon Redshift. If large dimension tables are contributing to slow performance or query timeouts, unload those tables to your data lake.
  • When you run federated queries, Athena spins up multiple Lambda functions, which causes a spike in database connections. It’s important to monitor the Amazon Redshift WLM queue slots to ensure there is no queuing. Additionally, you can use concurrency scaling on your Amazon Redshift cluster to benefit from concurrent connections to queue up.

Conclusion

In this post, you learned how to configure and use Athena federation with Amazon Redshift using Lambda. Now you don’t need to wait for all the data in your Amazon Redshift data warehouse to be unloaded to Amazon S3 and maintained on a day-to-day basis to run your queries. You can use the best practice considerations outlined in the post to minimize the data transferred from Amazon Redshift for better performance. When queries are well written for federation, the performance penalties are negligible, as observed in the TPC-DS benchmark queries in this post. Happy query federating!

 


About the Author

Harsha Tadiparthi is a Specialist Sr. Solutions Architect, AWS Analytics. He enjoys solving complex customer problems in Databases and Analytics and delivering successful outcomes. Outside of work, he loves to spend time with his family, watch movies, and travel whenever possible.

 

 

Enhancing customer safety by leveraging the scalable, secure, and cost-optimized Toyota Connected Data Lake

Post Syndicated from Sandeep Kulkarni original https://aws.amazon.com/blogs/big-data/enhancing-customer-safety-by-leveraging-the-scalable-secure-and-cost-optimized-toyota-connected-data-lake/

Toyota Motor Corporation (TMC), a global automotive manufacturer, has made “connected cars” a core priority as part of its broader transformation from an auto company to a mobility company. In recent years, TMC and its affiliate technology and big data company, Toyota Connected, have developed an array of new technologies to provide connected services that enhance customer safety and the vehicle ownership experience. Today, Toyota’s connected cars come standard with an on-board Data Communication Module (DCM) that links to a Controller Area Network (CAN). By using this hardware, Toyota provides various connected services to its customers.

Some of the connected services help drivers to safely enjoy their cars. Telemetry data is available from the car 24×7, and Toyota makes the data available to its dealers (when their customers opt-in for data sharing). For instance, a vehicle’s auxiliary battery voltage declines over time. With this data, dealership staff can proactively contact customers to recommend a charge prior to experiencing any issues. This automotive telemetry can also help fleet management companies monitor vehicle diagnostics, perform preventive maintenance and help avoid breakdowns.

There are other services such as usage-based auto insurance that leverage driving behavior data that can help safe drivers receive discounts on their car insurance. Telemetry plays a vital role in understanding driver behavior. If drivers choose to opt-in, a safety score can be generated based on their driving data and drivers can use their smartphones to check their safe driving scores.

A vehicle generates data every second, which can be bundled into larger packets at one-minute intervals. With millions of connected cars that have data points available every second, the incredible scale required to capture and store that data is immense—there are billions of messages daily generating petabytes of data. To make this vision a reality, Toyota Connected’s Mobility Team embarked on building a real-time “Toyota Connected Data Lake.” Given the scale, we leveraged AWS to build this platform. In this post, we show how we built the data lake and how we provide significant value to our customers.

Overview

The guiding principles for architecture and design that we used are as follows:

  • Serverless: We want to use cloud native technologies and spend minimal time on infrastructure maintenance.
  • Rapid speed to market: We work backwards from customer requirements and iterate frequently to develop minimally viable products (MVPs).
  • Cost-efficient at scale.
  • Low latency: near real time processing.

Our data lake needed to be able to:

  • Capture and store new data (relational and non-relational) at petabyte scale in real time.
  • Provide analytics that go beyond batch reporting and incorporate real time and predictive capabilities.
  • Democratize access to data in a secure and governed way, allowing our team to unleash their creative energy and deliver innovative solutions.

The following diagram shows the high-level architecture

Walkthrough

We built the serverless data lake with Amazon S3 as the primary data store, given the scalability and high availability of S3. The entire process is automated, which reduces the likelihood of human error, increases efficiency, and ensures consistent configurations over time, as well as reduces the cost of operations.

The key components of a data lake include Ingest, Decode, Transform, Analyze, and Consume:

  • IngestConnected vehicles send telemetry data once a minute—which includes speed, acceleration, turns, geo location, fuel level, and diagnostic error codes. This data is ingested into Amazon Kinesis Data Streams, processed through AWS Lambda to make it readable, and the “raw copy” is saved through Amazon Kinesis Data Firehose into an S3
  • Decode:  Data arriving into the Kinesis data stream in the ‘Decode’ pillar is decoded by a serverless Lambda function, which does most of the heavy lifting. Based upon a proprietary specification, this Lambda function does the bit-by-bit decoding of the input message to capture the particular sensor values. The small input payload of 35KB with data from over 180 sensors is now decoded and converted to a JSON message of 3 MB. This is then compressed and written to the ‘Decoded S3 bucket’.
  • Transform The aggregation jobs leverage the massively parallel capability of Amazon EMR, decrypt the decoded messages and convert the data to Apache Parquet Apache Parquet is a columnar storage file format designed for querying large amounts of data, regardless of the data processing framework, or programming language. Parquet allows for better compression, which reduces the amount of storage required. It also reduces I/O, since we can efficiently scan the data. The data sets are now available for analytics purposes, partitioned by masked identification numbers as well as by automotive models and dispatch type. A separate set of jobs transform the data and store it in Amazon DynamoDB to be consumed in real time from APIs.
  • ConsumeApplications needing to consume the data make API calls through the Amazon API Gateway. Authentication to the API calls is based on temporary tokens issued by Amazon Cognito.
  • AnalyzeData analytics can be directly performed off Amazon S3 by leveraging serverless Amazon Athena. Data access is democratized and made available to data science groups, who build and test various models that provide value to our customers.

Additionally, comprehensive monitoring is set up by leveraging Amazon CloudWatch, Amazon ES, and AWS KMS for managing the keys securely.

Scalability

The scalability capabilities of the building blocks in our architecture that allow us to reach this massive scale are:

  • S3: S3 is a massively scalable key-based object store that is well-suited for storing and retrieving large datasets. S3 partitions the index based on key name. To maximize performance of high-concurrency operations on S3, we introduced randomness into each of the Parquet object keys to increase the likelihood that the keys are distributed across many partitions.
  • Lambda: We can run as many concurrent functions as needed and can raise limits as required with AWS support.
  • Kinesis Firehose: It scales elastically based on volume without requiring any human intervention. We batch requests up to 128MiB or 15 minutes, whichever comes earlier to avoid small files. Additional details are available in Srikanth Kodali’s blog post.
  • Kinesis Data Streams: We developed an automated program that adjusts the shards based on incoming volume. This is based on the Kinesis Scaling Utility from AWS Labs, which allows us to scale in a way similar to EC2 Auto Scaling groups.
  • API Gateway: automatically scales to billions of requests and seamlessly handles our API traffic.
  • EMR cluster: We can programmatically scale out to hundreds of nodes based on our volume and scale in after processing is completed.

Our volumes have increased seven-fold since we migrated to AWS and we have only adjusted the number of shards in Kinesis Data Streams and the number of core nodes for EMR processing to scale with the volume.

Security in the AWS cloud

AWS provides a robust suite of security services, allowing us to have a higher level of security in the AWS cloud. Consistent with our security guidelines, data is encrypted both in transit and at rest. Additionally, we use VPC Endpoints, allowing us to keep traffic within the AWS network.

Data protection in transit:

Data protection at rest:

  • S3 server-side encryption handles all encryption, decryption and key management transparently. All user data stored in DynamoDB is fully encrypted at rest, for which we use an AWS-owned customer master key at no additional charge. Server-side encryption for Kinesis Data streams and Kinesis Data Firehose is also enabled to ensure that data is encrypted at rest.

Cost optimization

Given our very large data volumes, we were methodical about optimizing costs across all components of the infrastructure. The ultimate goal was to figure out the cost of the APIs we were exposing. We developed a robust cost model validated with performance testing at production volumes:

  • NAT gateway: When we started this project, one of the significant cost drivers was traffic flowing from Lambda to Kinesis Data Firehose that went over the NAT gateway, since Kinesis Data Firehose did not have a VPC endpoint. Traffic flowing through the NAT gateway costs $0.045/GB, whereas traffic flowing through the VPC endpoint costs $0.01/GB. Based on a product feature request from Toyota, AWS implemented this feature (VPC Endpoint for Firehose) early this year. We implemented this feature, which resulted in a four-and-a-half-fold reduction in our costs for data transfer.
  • Kinesis Data Firehose: Since Kinesis Data Firehose did not support encryption of data at rest initially, we had to use client-side encryption using KMS–this was the second significant cost driver. We requested a feature for native server-side encryption in Kinesis Data Firehose. This was released earlier this year and we enabled server-side encryption on the Kinesis Data Firehose stream. This removed the Key Management Service (KMS), resulting in another 10% reduction in our total costs.

Since Kinesis Data Firehose charges based on the amount of data ingested ($0.029/GB), our Lambda function compresses the data before writing to Kinesis Data Firehose, which saves on the ingestion cost.

  • S3– We use lifecycle policies to move data from S3 (which costs $0.023/GB) to Amazon S3 Glacier (which costs $0.004/GB) after a specified duration. Glacier provides a six-fold cost reduction over S3. We further plan to move the data from Glacier to Amazon S3 Glacier Deep Archive (which costs $0.00099/GB), which will provide us a four-fold reduction over Glacier costs. Additionally, we have set up automated deletes of certain data sets at periodic intervals.
  • EMR– We were planning to use AWS Glue and keep the architecture serverless, but made the decision to leverage EMR from a cost perspective. We leveraged spot instances for transformation jobs in EMR, which can provide up to 60% savings. The hourly jobs complete successfully with spot instances, however the nightly aggregation jobs leveraging r5.4xlarge instances failed frequently as sufficient spot capacity was not available. We decided to move to “on-demand” instances, while we finalize our strategy for “reserved instances” to reduce costs.
  • DynamoDB: Time to Live (TTL) for DynamoDB lets us define when items in a table expire so that they can be automatically deleted from the database. We enabled TTL to expire objects that are not needed after a certain duration. We plan to use reserved capacity for read and write control units to reduce costs. We also use DynamoDB auto scaling ,which helps us manage capacity efficiently, and lower the cost of our workloads because they have a predictable traffic pattern. In Q2 of 2019, DynamoDBremoved the associated costs of DynamoDB Streams used in replicating data globally, which translated to extra cost savings in global tables.
  • Amazon DynamoDB Accelerator(DAX):  Our DynamoDB tables are front-ended by DAX, which improves the response time of our application by dramatically reducing read latency, as compared to using DynamoDB. Using DAX, we also lower the cost of DynamoDB by reducing the amount of provisioned read throughput needed for read-heavy applications.
  • Lambda: We ran benchmarks to arrive at the optimal memory configuration for Lambda functions. Memory allocation in Lambda determines CPU allocation and for some of our Lambda functions, we allocated higher memory, which results in faster execution, thereby reducing the amount of GB-seconds per function execution, which saves time and cost. Using DynamoDB Accelerator (DAX) from  Lambda has several benefits for serverless applications that also use DynamoDB. DAX can improve the response time of your application by dramatically reducing read latency, as compared to using DynamoDB. For serverless applications, combining Lambda with DAX provides an additional benefit: Lower latency results in shorter execution times, which means lower costs for Lambda.
  • Kinesis Data Streams: We scale our streams through an automated job, since our traffic patterns are fairly predictable. During peak hours we add additional shards and delete them during the off-peak hours, thus allowing us to reduce costs when shards are not in use

Enhancing customer safety

The Data Lake presents multiple opportunities to enhance customer safety. Early detection of market defects and pinpointing of target vehicles affected by those defects is made possible through the telemetry data ingested from the vehicles. This early detection leads to early resolution way before the customer is affected. On-board software in the automobiles can be constantly updated over-the-air (OTA), thereby saving time and costs. The automobile can generate a Health Check Report based on the driving style of its drivers, which can create the ideal maintenance plan for drivers for worry-free driving.

The driving data for an individual driver based on speed, sharp turns, rapid acceleration, and sudden braking can be converted into a “driver score” which ranges from 1 to 100 in value. The higher the driver-score, the safer the driver. Drivers can view their scores on mobile devices and monitor the specific locations of harsh driving on the journey map. They can then use this input to self-correct and modify their driving habits to improve their scores, which will not only result in a safer environment but drivers could also get lower insurance rates from insurance companies. This also gives parents an opportunity to monitor the scores for their teenage drivers and coach them appropriately on safe driving habits. Additionally, notifications can be generated if the teenage driver exceeds an agreed-upon speed or leaves a specific area.

Summary

The automated serverless data lake is a robust scalable platform that allows us to analyze data as it becomes available in real time. From an operations perspective, our costs are down significantly. Several aggregation jobs that took 15+ hours to run, now finish in 1/40th of the time. We are impressed with the reliability of the platform that we built. The architectural decision to go serverless has reduced operational burden and will also allow us to have a good handle on our costs going forward. Additionally, we can deploy this pipeline in other geographies with smaller volumes and only pay for what we consume.

Our team accomplished this ambitious development in a short span of six months. They worked in an agile, iterative fashion and continued to deliver robust MVPs to our business partners. Working with the service teams at AWS on product feature requests and seeing them come to fruition in a very short time frame has been a rewarding experience and we look forward to the continued partnership on additional requests.

 


About the Authors


Sandeep Kulkarni is an enterprise architect at AWS. His passion is to accelerate digital transformation for customers and build highly scalable and cost-effective solutions in the cloud. In his spare time, he loves to do yoga and gardening.

 

 

 

 

Shravanthi Denthumdas is the director of mobility services at Toyota Connected.Her team is responsible for building the Data Lake and delivering services that allow drivers to safely enjoy their cars. In her spare time, she likes to spend time with her family and children.

 

 

 

 

Optimize Python ETL by extending Pandas with AWS Data Wrangler

Post Syndicated from Satoshi Kuramitsu original https://aws.amazon.com/blogs/big-data/optimize-python-etl-by-extending-pandas-with-aws-data-wrangler/

Developing extract, transform, and load (ETL) data pipelines is one of the most time-consuming steps to keep data lakes, data warehouses, and databases up to date and ready to provide business insights. You can categorize these pipelines into distributed and non-distributed, and the choice of one or the other depends on the amount of data you need to process.

Apache Spark is widely used to build distributed pipelines, whereas Pandas is preferred for lightweight, non-distributed pipelines. With the second use case in mind, the AWS Professional Service team created AWS Data Wrangler, aiming to fill the integration gap between Pandas and several AWS services, such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, AWS Glue, Amazon Athena, Amazon Aurora, Amazon QuickSight, and Amazon CloudWatch Log Insights.

AWS Data Wrangler is an open-source Python library that enables you to focus on the transformation step of ETL by using familiar Pandas transformation commands and relying on abstracted functions to handle the extraction and load steps.

You can use AWS Data Wrangler in different environments on AWS and on premises (for more information, see Install). This post focuses on data preparation for a data science project on Jupyter. By the end of this walkthrough, you will be able to set up AWS Data Wrangler on your Amazon SageMaker notebook.

Use case overview

In the following walkthrough, you use data stored in the NOAA public S3 bucket. For more information, see NOAA Global Historical Climatology Network Daily. The objective is to convert 10 CSV files (approximately 240 MB total) to a partitioned Parquet dataset, store its related metadata into the AWS Glue Data Catalog, and query the data using Athena to create a data analysis.

Configuring Amazon S3

Your first step is to create an S3 bucket to store the Parquet dataset.

  1. On the Amazon S3 console, choose Create bucket.
  2. For Bucket name, enter a name for your bucket.

  1. Choose Create.

Creating a new database in the Data Catalog

The Data Catalog is an Apache Hive-compatible managed metadata storage that lets you store, annotate, and share metadata on AWS.

For this use case, you use it to store the metadata associated with your Parquet dataset. The Data Catalog is integrated with many analytics services, including Athena, Amazon Redshift Spectrum, and Amazon EMR (Apache Spark, Apache Hive, and Presto).

  1. On the AWS Glue console, choose Databases.
  2. Choose Add database.
  3. For Database name, enter awswrangler_test.
  4. Choose Create.

Launching an Amazon SageMaker notebook

An Amazon SageMaker notebook is a managed instance running the Jupyter Notebook app. For this use case, you use it to write and run your code.

  1. On the Amazon SageMaker console, choose Notebook instance.
  2. Choose Create a notebook instance.
  3. For Notebook instance name, enter a name.
  4. For IAM role, choose an existing AWS Identity and Access Management (IAM) role or create a role that allows you to run Amazon SageMaker and grants access to Amazon S3, Athena, and AWS Glue for the related resources.

  1. Wait for the notebook status to show as InService.
  2. Choose Open Jupyter from the notebook instance you created.

Exploring the data

This section walks you through several notebook paragraphs to expose how to install and use AWS Data Wrangler.

  1. On Jupyter console, under New, choose conda_python3.
  2. To install AWS Data Wrangler, enter the following code:
    !pip install awswrangler

  3. To avoid dependency conflicts, restart the notebook kernel by choosing kernel -> Restart.
  4. Import the library given the usual alias wr:
    import awswrangler as wr

  5. List all files in the NOAA public bucket from the decade of 1880:
    wr.s3.list_objects("s3://noaa-ghcn-pds/csv/188")

The following screenshot shows the output.

  1. Load the whole decade (10 files) into a Pandas DataFrame using the Amazon S3 prefix s3://noaa-ghcn-pds/csv/188:
    col_names = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
    
    df = wr.s3.read_csv(
        path="s3://noaa-ghcn-pds/csv/188",
        names=col_names,
        parse_dates=["dt", "obs_time"]  # Hint to parse these columns as date instead of strings
    )

    The following screenshot shows the output.

  1. Create a new column extracting the year from the dt column (the new column is useful for creating partitions in the Parquet dataset):
    df["year"] = df["dt"].dt.year

The following screenshot shows the output.

  1. Store the Pandas DataFrame in the S3 bucket you created in the beginning of this post (replace the [BUCKET] placeholder in the code with your bucket name):
    wr.s3.to_parquet(
        df=df,
        path="s3://[BUCKET]/noaa/",
        dataset=True,
        database="awswrangler_test",
        table="noaa",
        partition_cols=["year"]
    );

The preceding code creates the table noaa in the awswrangler_test database in the Data Catalog.

  1. After processing this, you can confirm the Parquet files exist in Amazon S3 and the table noaa is in AWS Glue data catalog. See the following code:
    wr.s3.list_objects("s3://[BUCKET]/noaa/")
    wr.catalog.table(database="awswrangler_test", table="noaa")

The following screenshot shows the output.

  1. Run a SQL query from Athena that filters only the US maximum temperature measurements of the last 3 years (1887–1889) and receive the result as a Pandas DataFrame:
    sql = """
    SELECT
        dt,
        (value / 10.0) AS temperature  -- Converting tenths of degrees C to regular degrees C
    FROM noaa
    WHERE year BETWEEN 1887 AND 1889  -- Only last 3 years (PARTITION filter)
    AND substr(id, 1, 2)='US'  -- Only U.S. stations
    AND element='TMAX'  -- Only Maximum temperature elements
    AND q_flag is NULL  -- Only HIGH quality measurement
    """
    
    df = wr.athena.read_sql_query(sql, database="awswrangler_test")

The following screenshot shows the output.

The following two queries illustrate how you can visualize the data.

  1. To plot the average maximum temperature measured in the tracked station, enter the following code:
    %matplotlib inline
    df.groupby("dt").mean().plot();

The following screenshot shows the output.

  1. To plot a moving average of the previous metric with a 30-day window, enter the following code:
    %matplotlib inline
    df.groupby("dt").mean().rolling(window=30, center=True).mean().plot();

The following screenshot shows the output.

Cleaning up

To avoid incurring future charges, delete the resources from the following services:

  1. AWS Glue database
    • On the AWS Glue console, choose the database you created.
    • From the Actions drop-down menu, choose Delete database.
    • Choose Delete.
  2. Amazon SageMaker notebook
    • On the Amazon SageMaker console, choose the notebook instance you created.
    • From the Actions drop-down menu, choose Stop.
    • When the status shows as Stopped, choose Database.
    • Choose Delete.
  3. S3 bucket
    • On the Amazon S3 console, choose Buckets.
    • Choose the bucket you created.
    • Choose Empty and enter your bucket name.
    • Choose Confirm.
    • Choose Delete and enter your bucket name.
    • Choose Delete bucket.
  4. IAM Role
    • On the IAM console, choose Roles.
    • Choose the role you attached to Amazon SageMaker.
    • Choose Delete role.
    • Choose Yes.

Conclusion

Installing AWS Data Wrangler is a breeze. With a single command, you can connect ETL tasks to multiple data sources and different data services. The library is a work in progress, with new features and enhancements added regularly. For more tutorials, see the GitHub repo.

 


About the Authors

Satoshi Kuramitsu is a Solutions Architect in AWS. His favorite AWS services are AWS Glue, Amazon Kinesis, and Amazon S3.

 

 

 

 

 

Igor Tavares is a Data & Machine Learning Engineer in the AWS Professional Services team and the original creator of AWS Data Wrangler.

 

 

 

Automate Amazon Athena queries for PCI DSS log review using AWS Lambda

Post Syndicated from Logan Culotta original https://aws.amazon.com/blogs/security/automate-amazon-athena-queries-for-pci-dss-log-review-using-aws-lambda/

In this post, I will show you how to use AWS Lambda to automate PCI DSS (v3.2.1) evidence generation, and daily log review to assist with your ongoing PCI DSS activities. We will specifically be looking at AWS CloudTrail Logs stored centrally in Amazon Simple Storage Service (Amazon S3) (which is also a Well-Architected Security Pillar best practice) and use Amazon Athena to query.

This post assumes familiarity with creating a database in Athena. If you’re new to Athena, please take a look at the Athena getting started guide and create a database before continuing. Take note of the bucket chosen for the output of Athena query results, we will use it later in this post.

In this post, we walk through:

  • Creating a partitioned table for your AWS CloudTrail logs. In order to reduce costs and time to query results in Athena, we’ll show you how to partition your data. If you’re not already familiar with partitioning, you can learn about it in the Athena user guide.
  • Constructing SQL queries to search for PCI DSS audit log evidence. The SQL queries that are provided in this post are directly related to PCI DSS requirement 10. Customizing these queries to meet your responsibilities may be able to assist you in preparing for a PCI DSS assessment.
  • Creating an AWS Lambda function to automate running these SQL queries daily, in order to help address the PCI DSS daily log review requirement 10.6.1.

Create and partition a table

The following code will create and partition a table for CloudTrail logs. Before you execute this query, be sure to replace the variable placeholders with the information from your database. They are:

  • <YOUR_TABLE> – the name of your Athena table
  • LOCATION – the path to your CloudTrail logs in Amazon S3. An example is included in the following code. It includes the variable placeholders:
    • <AWS_ACCOUNT_NUMBER> – your AWS account number. If using organizational CloudTrail, use the following format throughout the post for this variable: o-<orgID>/<ACCOUNT_NUMBER>
    • <LOG_BUCKET> – the bucket name where the CloudTrail logs to be queried reside

CREATE EXTERNAL TABLE <YOUR_TABLE> (
    eventVersion STRING,
    userIdentity STRUCT<
        type: STRING,
        principalId: STRING,
        arn: STRING,
        accountId: STRING,
        invokedBy: STRING,
        accessKeyId: STRING,
        userName: STRING,
        sessionContext: STRUCT<
            attributes: STRUCT<
                mfaAuthenticated: STRING,
                creationDate: STRING>,
            sessionIssuer: STRUCT<
                type: STRING,
                principalId: STRING,
                arn: STRING,
                accountId: STRING,
                userName: STRING>>>,
    eventTime STRING,
    eventSource STRING,
    eventName STRING,
    awsRegion STRING,
    sourceIpAddress STRING,
    userAgent STRING,
    errorCode STRING,
    errorMessage STRING,
    requestParameters STRING,
    responseElements STRING,
    additionalEventData STRING,
    requestId STRING,
    eventId STRING,
    resources ARRAY<STRUCT<
        arn: STRING,
        accountId: STRING,
        type: STRING>>,
    eventType STRING,
    apiVersion STRING,
    readOnly STRING,
    recipientAccountId STRING,
    serviceEventDetails STRING,
    sharedEventID STRING,
    vpcEndpointId STRING
)
COMMENT 'CloudTrail table'
PARTITIONED BY(region string, year string, month string, day string)
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://<LOG_BUCKET>/AWSLogs/<AWS_ACCOUNT_NUMBER>/CloudTrail/'
TBLPROPERTIES ('classification'='cloudtrail');

Execute the query. You should see a message stating Query successful.

Figure 1: Query successful

Figure 1: Query successful

The preceding query creates a CloudTrail table and defines the partitions in your Athena database. Before you begin running queries to generate evidence, you will need to run alter table commands to finalize the partitioning.

Be sure to update the following variable placeholders with your information:

  • <YOUR_DATABASE> – the name of your Athena database
  • <YOUR_TABLE>
  • <LOG_BUCKET>
  • <AWS_ACCOUNT_NUMBER>

Provide values for the following variables:

  • region – region of the logs to partition
  • month – month of the logs to partition
  • day – day of the logs to partition
  • year – year of the logs to partition
  • LOCATION – the path to your CloudTrail logs in Amazon S3 to partition, down to the specific day (should match the preceding values of region, month, day, and year). It includes the variable placeholders:
    • <AWS_ACCOUNT_NUMBER>
    • <LOG_BUCKET>

ALTER TABLE <YOUR_DATABASE>.<YOUR_TABLE>  ADD partition  (region='us-east-1', month='02', day='28', year='2020') location 's3://<LOG_BUCKET>/AWSLogs/<AWS_ACCOUNT_NUMBER>/CloudTrail/us-east-1/2020/02/28/';

After the partition has been configured, you can query logs from the date and region that was partitioned. Here’s an example for PCI DSS requirement 10.2.4 (all relevant PCI DSS requirements are described later in this post).


SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname = 'ConsoleLogin' AND responseelements LIKE '%Failure%' AND region= 'us-east-1' AND year='2020' AND month='02' AND day='28';

Create a Lambda function to save time

As you can see, this process above can involve a lot of manual steps as you set up partitioning for each region and then query for each day or region. Let’s simplify the process by putting these into a Lambda function.

Use the Lambda console to create a function

To create the Lambda function:

  1. Open the Lambda console and choose Create function, and select the option to Author from scratch.
  2. Enter Athena_log_query as the function name, and select Python 3.8 as the runtime.
  3. Under Choose or create an execution role, select Create new role with basic Lambda permissions.
  4. Choose Create function.
  5. Once the function is created, select the Permissions tab at the top of the page and select the Execution role to view in the IAM console. It will look similar to the following figure.
     
    Figure 2: Permissions tab

    Figure 2: Permissions tab

Update the IAM Role to allow Lambda permissions to relevant services

  1. In the IAM console, select the policy name. Choose Edit policy, then select the JSON tab and paste the following code into the window, replacing the following variable and placeholders:
    • us-east-1 – This is the region where resources are. Change only if necessary.
    • <AWS_ACCOUNT_NUMBER>
    • <YOUR_DATABASE>
    • <YOUR_TABLE>
    • <LOG_BUCKET>
    • <OUTPUT_LOG_BUCKET> – bucket name you chose to store the query results when setting up Athena.
    
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "glue:UpdateDatabase",
                    "glue:BatchCreatePartition",
                    "glue:GetDatabase",
                    "athena:StartQueryExecution",
                    "glue:GetPartitions",
                    "glue:UpdateTable",
                    "s3:CreateBucket",
                    "s3:ListBucket",
                    "glue:GetTable",
                    "s3:ListMultipartUploadParts",
                    "s3:PutObject",
                    "s3:GetObjectAcl",
                    "s3:GetObject",
                    "athena:CancelQueryExecution",
                    "athena:StopQueryExecution",
                    "athena:GetQueryExecution",
                    "s3:GetBucketLocation",
                    "glue:UpdatePartition"
                ],
                "Resource": [
                    "arn:aws:glue:us-east-1:<AWS_ACCOUNT_NUMBER>:catalog",
                    "arn:aws:glue:us-east-1:<AWS_ACCOUNT_NUMBER>:table/<YOUR_DATABASE>/<YOUR_TABLE>",
                    "arn:aws:glue:us-east-1:<AWS_ACCOUNT_NUMBER>:database/mydatabase",
                    "arn:aws:s3:::<LOG_BUCKET>/*",
                    "arn:aws:s3:::<LOG_BUCKET>",
    				"arn:aws:s3:::<OUTPUT_LOG_BUCKET>/*",
    				"arn:aws:s3:::<OUTPUT_LOG_BUCKET>",
                    "arn:aws:athena:us-east-1:<AWS_ACCOUNT_NUMBER>:workgroup/primary"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "logs:PutLogEvents",
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream"
                ],
                "Resource": "arn:aws:logs:us-east-1:<AWS_ACCOUNT_NUMBER>:*"
            }
        ]
    }
    

    Note: Depending on the environment, this policy might not be restrictive enough and should be limited to only users needing access to the cardholder data environment and audit logs. More information about restricting IAM policies can be found in IAM JSON Policy Elements: Condition Operators.

  2. Choose Review policy and then Save changes.

Customize the Lambda Function

  1. On the Lambda dashboard, choose the Configuration tab. In Basic settings, increase the function timeout to 5 minutes to ensure that the function always has time to finish running your queries, and then select Save. Best Practices for Developing on AWS Lambda has more tips for using Lambda.
  2. Paste the following code into the function editor on the Configuration tab, replacing the existing text. The code includes eight example queries to run and can be customized as needed.

    The first query will add partitions to your Amazon S3 logs so that the following seven queries will run quickly and be cost effective.

    This code combines the partitioning, and example Athena queries to assist in meeting PCI DSS logging requirements, which will be explained more below:

    Replace these values in the code that follows:

    • <YOUR_DATABASE>
    • <YOUR_TABLE>
    • <LOG_BUCKET>
    • <AWS_ACCOUNT_NUMBER>
    • <OUTPUT_LOG_BUCKET>
    • REGION1 – first region to partition
    • REGION2 – second region to partition*
    
    import boto3
    import datetime
    import time
    
    #EDIT THE FOLLOWING#
    #----------------------#
    
    #This should be the name of your Athena database
    ATHENA_DATABASE = "<YOUR_DATABASE>"
    
    #This should be the name of your Athena database table
    ATHENA_TABLE = "<YOUR_TABLE>"
    
    #This is the Amazon S3 bucket name you want partitioned and logs queried from:
    LOG_BUCKET = "<LOG_BUCKET>"
    
    #AWS Account number for the Amazon S3 path to your CloudTrail logs
    AWS_ACCOUNT_ID = "<AWS_ACCOUNT_NUMBER>"
    
    #This is the Amazon S3 bucket name for the Athena Query results:
    OUTPUT_LOG_BUCKET = "<OUTPUT_LOG_BUCKET>"
    
    #Define regions to partition
    REGION1 = "us-east-1"
    REGION2 = "us-west-2"
    #----------------------#
    #STOP EDITING#
    
    RETRY_COUNT = 50
    
    #Getting the current date and splitting into variables to use in queries below
    CURRENT_DATE = datetime.datetime.today()
    DATEFORMATTED = (CURRENT_DATE.isoformat())
    ATHENA_YEAR = str(DATEFORMATTED[:4])
    ATHENA_MONTH = str(DATEFORMATTED[5:7])
    ATHENA_DAY = str(DATEFORMATTED[8:10])
    
    #location for the Athena query results
    OUTPUT_LOCATION = "s3://"+OUTPUT_LOG_BUCKET+"/DailyAthenaLogs/CloudTrail/"+str(CURRENT_DATE.isoformat())
    
    #Athena Query definitions for PCI DSS requirements
    YEAR_MONTH_DAY = f'year=\'{ATHENA_YEAR}\' AND month=\'{ATHENA_MONTH}\' AND day=\'{ATHENA_DAY}\';'
    ATHENA_DB_TABLE = f'{ATHENA_DATABASE}.{ATHENA_TABLE}'
    PARTITION_STATEMENT_1 = f'partition (region="{REGION1}", month="{ATHENA_MONTH}", day="{ATHENA_DAY}", year="{ATHENA_YEAR}")'
    LOCATION_1 = f' location "s3://{LOG_BUCKET}/AWSLogs/{AWS_ACCOUNT_ID}/CloudTrail/{REGION1}/{ATHENA_YEAR}/{ATHENA_MONTH}/{ATHENA_DAY}/"'
    PARTITION_STATEMENT_2 = f'partition (region="{REGION2}", month="{ATHENA_MONTH}", day="{ATHENA_DAY}", year="{ATHENA_YEAR}")'
    LOCATION_2 = f' location "s3://{LOG_BUCKET}/AWSLogs/{AWS_ACCOUNT_ID}/CloudTrail/{REGION2}/{ATHENA_YEAR}/{ATHENA_MONTH}/{ATHENA_DAY}/"'
    SELECT_STATEMENT = "SELECT * FROM "+ATHENA_DB_TABLE+ " WHERE "
    LIKE_BUCKET = f' \'%{LOG_BUCKET}%\''
    
    
    #Query to partition selected regions
    QUERY_1 = f'ALTER TABLE {ATHENA_DB_TABLE} ADD IF NOT EXISTS {PARTITION_STATEMENT_1} {LOCATION_1} {PARTITION_STATEMENT_2} {LOCATION_2}'
    
    #Access to audit trails or CHD 10.2.1/10.2.3
    QUERY_2 = f'{SELECT_STATEMENT} requestparameters LIKE {LIKE_BUCKET} AND sourceipaddress <> \'cloudtrail.amazonaws.com\' AND sourceipaddress <> \'athena.amazonaws.com\' AND eventName = \'GetObject\' AND {YEAR_MONTH_DAY}'
    
    #Root Actions PCI DSS 10.2.2
    QUERY_3 = f'{SELECT_STATEMENT} userIdentity.sessionContext.sessionIssuer.userName LIKE \'%root%\' AND {YEAR_MONTH_DAY}'
    
    #Failed Logons PCI DSS 10.2.4
    QUERY_4 = f'{SELECT_STATEMENT} eventname = \'ConsoleLogin\' AND responseelements LIKE \'%Failure%\' AND {YEAR_MONTH_DAY}'
    
    #Privilege changes PCI DSS 10.2.5.b, 10.2.5.c
    QUERY_5 = f'{SELECT_STATEMENT} eventname LIKE \'%AddUserToGroup%\' AND requestparameters LIKE \'%Admin%\' AND {YEAR_MONTH_DAY}'
    
    # Initialization, stopping, or pausing of the audit logs PCI DSS 10.2.6
    QUERY_6 = f'{SELECT_STATEMENT} eventname = \'StopLogging\' OR eventname = \'StartLogging\' AND {YEAR_MONTH_DAY}'
    
    #Suspicious activity PCI DSS 10.6
    QUERY_7 = f'{SELECT_STATEMENT} eventname LIKE \'%DeleteSecurityGroup%\' OR eventname LIKE \'%CreateSecurityGroup%\' OR eventname LIKE \'%UpdateSecurityGroup%\' OR eventname LIKE \'%AuthorizeSecurityGroup%\' AND {YEAR_MONTH_DAY}'
    
    QUERY_8 = f'{SELECT_STATEMENT} eventname LIKE \'%Subnet%\' and eventname NOT LIKE \'Describe%\' AND {YEAR_MONTH_DAY}'
    
    #Defining function to generate query status for each query
    def query_stat_fun(query, response):
        client = boto3.client('athena')
        query_execution_id = response['QueryExecutionId']
        print(query_execution_id +' : '+query)
        for i in range(1, 1 + RETRY_COUNT):
            query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
            query_fail_status = query_status['QueryExecution']['Status']
            query_execution_status = query_fail_status['State']
    
            if query_execution_status == 'SUCCEEDED':
                print("STATUS:" + query_execution_status)
                break
    
            if query_execution_status == 'FAILED':
                print(query_fail_status)
    
            else:
                print("STATUS:" + query_execution_status)
                time.sleep(i)
        else:
            client.stop_query_execution(QueryExecutionId=query_execution_id)
            raise Exception('Maximum Retries Exceeded')
    
    def lambda_handler(query, context):
        client = boto3.client('athena')
        queries = [QUERY_1, QUERY_2, QUERY_3, QUERY_4, QUERY_5, QUERY_6, QUERY_7, QUERY_8]
        for query in queries:
            response = client.start_query_execution(
                QueryString=query,
                QueryExecutionContext={
                    'Database': ATHENA_DATABASE },
                ResultConfiguration={
                    'OutputLocation': OUTPUT_LOCATION })
            query_stat_fun(query, response)
    

    Note: More regions can be added if you have additional regions to partition. The ADD partition statement can be copied and pasted to add additional regions as needed. Additionally, you can hard code the regions for your environment into the statements.

  3. Choose Save in the top right.

Athena Queries used to collect evidence

The queries used to gather evidence for PCI DSS are broken down from the Lambda function we created, using the partitioned date example from above. They are listed with their respective requirement.

Note: AWS owns the security OF the cloud, providing high levels of security in alignment with our numerous compliance programs. The customer is responsible for the security of their resources IN the cloud, keeping its content secure and compliant. The queries below are meant to be a proof of concept and should be tailored to your environment.

10.2.1/10.2.3 – Implement automated audit trails for all system components to reconstruct access to either or both cardholder data and audit trails:


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE requestparameters LIKE '%<LOG_BUCKET>%' AND sourceipaddress <> 'cloudtrail.amazonaws.com' AND sourceipaddress <>  'athena.amazonaws.com' AND eventName = 'GetObject' AND year='2020' AND month='02' AND day='28';"

10.2.2 – Implement automated audit trails for all system components to reconstruct all actions taken by anyone using root or administrative privileges.


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE userIdentity.sessionContext.sessionIssuer.userName LIKE '%root%' AND year='2020' AND month='02' AND day='28';"

10.2.4 – Implement automated audit trails for all system components to reconstruct invalid logical access attempts.


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname = 'ConsoleLogin' AND responseelements LIKE '%Failure%' AND year='2020' AND month='02' AND day='28';"

10.2.5.b – Verify all elevation of privileges is logged.

10.2.5.c – Verify all changes, additions, or deletions to any account with root or administrative privileges are logged:


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname LIKE '%AddUserToGroup%' AND requestparameters LIKE '%Admin%' AND year='2020' AND month='02' AND day='28';"

10.2.6 – Implement automated audit trails for all system components to reconstruct the initialization, stopping, or pausing of the audit logs:


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname = 'StopLogging' OR eventname = 'StartLogging' AND year='2020' AND month='02' AND day='28';"

10.6 – Review logs and security events for all system components to identify anomalies or suspicious activity:


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname LIKE '%DeleteSecurityGroup%' OR eventname LIKE '%CreateSecurityGroup%' OR eventname LIKE '%UpdateSecurityGroup%' OR eventname LIKE '%AuthorizeSecurityGroup%' AND year='2020' AND month='02' AND day='28';" 

"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname LIKE '%Subnet%' and eventname NOT LIKE 'Describe%' AND year='2020' AND month='02' AND day='28';" 

You can use the AWS Command Line Interface (AWS CLI) to invoke the Lambda function using the following command, replacing <YOUR_FUNCTION> with the name of the Lambda function you created:


aws lambda invoke --function-name <YOUR_FUNCTION> outfile

The AWS Lambda API Reference has more information on using Lambda with AWS CLI.

Note: the results from the function will be located in the OUTPUT_LOCATION variable within the Lambda function.

Use Amazon CloudWatch to run your Lambda function

You can create a rule in CloudWatch to have this function run automatically on a set schedule.

Create a CloudWatch rule

  1. From the CloudWatch dashboard, under Events, select Rules, then Create rule.
  2. Under Event Source, select the radio button for Schedule and choose a fixed rate or enter in a custom cron expression.
  3. Finally, in the Targets section, choose Lambda function and find your Lambda function from the drop down.

    The example screenshot shows a CloudWatch rule configured to invoke the Lambda function daily:
     

    Figure 3: CloudWatch rule

    Figure 3: CloudWatch rule

  4. Once the schedule is configured, choose Configure details to move to the next screen.
  5. Enter a name for your rule, make sure that State is enabled, and choose Create rule.

Check that your function is running

You can then navigate to your Lambda function’s CloudWatch log group to see if the function is running as intended.

To locate the appropriate CloudWatch group, from your Lambda function within the console, select the Monitoring tab, then choose View logs in CloudWatch.
 

Figure 4: View logs in CloudWatch

Figure 4: View logs in CloudWatch

You can take this a step further and set up an SNS notification to email you when the function is triggered.

Summary

In this post, we walked through partitioning an Athena table, which assists in reducing time and cost when running queries on your S3 buckets. We then constructed example SQL queries related to PCI DSS requirement 10, to assist in audit preparation. Finally, we created a Lambda function to automate running daily queries to pull PCI DSS audit log evidence from Amazon S3, to assist with the PCI DSS daily log review requirement. I encourage you to customize, add, or remove the SQL queries to best fit your needs and compliance requirements.

If you have feedback about this post, submit comments in the Comments section below.

Want more AWS Security how-to content, news, and feature announcements? Follow us on Twitter.

Author

Logan Culotta

Logan Culotta is a Security Assurance Consultant, and a current Qualified Security Assessor (QSA). Logan is part of the AWS Security Assurance team, which is also a Qualified Security Assessor Company (QSAC). He enjoys finding ways to automate compliance and security in the AWS cloud. In his free time, you can find him spending time with his family, road cycling, or cooking.

Anonymize and manage data in your data lake with Amazon Athena and AWS Lake Formation

Post Syndicated from Manos Samatas original https://aws.amazon.com/blogs/big-data/anonymize-and-manage-data-in-your-data-lake-with-amazon-athena-and-aws-lake-formation/

Organizations collect and analyze more data than ever before. They move as fast as they can on their journey to become more data driven by using the insights from their data.

Different roles use data for different purposes. For example, data engineers transform the data before further processing, data analysts access the data and produce reports, and data scientists with domain and technical expertise can train machine learning algorithms. Those roles require access to the data, and access has never been easier to grant.

At the same time, most organizations have to comply with regulations when dealing with their customer data. For that reason, datasets that contain personally identifiable information (PII) is often anonymized. A common example of PII can be tables and columns that contain personal information about an individual (such as first name and last name) or tables with columns that, if joined with another table, can trace back to an individual.

You can use AWS Analytics services to anonymize your datasets. In this post, I describe how to use Amazon Athena to anonymize a dataset.  You can then use AWS Lake Formation to provide the right access to the right personas.

Use case

To better understand the concept, we use a straightforward use case: analysts in your organization need access to a dataset with sales data, some of which contains PII information. As the data lake admin, you’re not comfortable with all personnel having access to customers’ PII. To address this, you can use an anonymized dataset.

This use case has two users:

  • datalake_admin – Responsible for data anonymization and making sure the right permissions are enforced. They classify the data, generate anonymized datasets, and configures the required permissions.
  • datalake_analyst – Only has access to the anonymized dataset. They can extract patterns for users without tracing the request back to an individual customer.

The following AWS CloudFormation template generates the AWS Glue tables that you use later in this post:

However, the template doesn’t create the datalake_admin and datalake_analyst users. For more information about personas in Lake Formation, see Lake Formation Personas and IAM Permissions Reference.

Solution architecture

For this solution, you use the following services:

  • Lake Formation – Lake Formation makes it easy to set up a secure data lake—a centralized, curated, and secured repository that stores all your data, both in its original form and prepared for analysis. The data lake admin can easily label the data and give users permission to access authorized datasets.
  • Athena – Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries you run. For this use case, the data lake admin uses Athena to anonymize the data, after which the data analyst can use Athena for interactive analytics over anonymized datasets.
  • Amazon S3Amazon Simple Storage Service (Amazon S3) is an object storage service that offers industry-leading scalability, data availability, security, and performance. For this use case, you use Amazon S3 as storage for the data lake.

The following diagram illustrates the architecture for this solution.

In this architecture, there are no servers to manage. You only pay what you use. You can use the same solution for small or large datasets. The scaling happens behind the scenes but in a transparent way.

In the following sections, you look in more detail on how to do the following:

  • Label sensitive data with AWS Lake Formation
  • Anonymize data with Athena
  • Apply permissions with Lake Formation
  • Analyze the anonymized datasets

Labeling the sensitive data with Lake Formation

As a data lake admin, the first task is to label the personal information. Tags don’t enforce any security controls, but applying a good tagging strategy is a great way to describe the data. Tags are key-value pairs that you can apply for your AWS resources, including table and columns in your data lake. For this use case, you apply a very simple tagging strategy: for the columns that contain PII, you give the value PII.

You interact with the following tables from the tcp-ds dataset, which both have their data stored in Amazon S3 in CSV format:

  • store_sales – Stores sales data and references other tables that you can join together for more sophisticated business queries. The table has a foreign key with the customer table on the ss_customer_sk This key, when joined with the customer table, can uniquely identify a user. For that reason, treat this column as personal information.
  • customer – Stores customer data, a lot of which is PII. In addition to c_customer_sk, you could use data such as customer ID, (c_customer_id), customer name (c_first_name), customer last name (c_last_name), login (c_login), and email (c_email_address) to uniquely identify a customer.

To start tagging your columns (starting with the store_sales table), complete the following steps:

  1. As the data lake admin user, log in to the Lake Formation console.
  2. Choose Data Catalog Tables.
  3. Select store_sales.
  4. Choose Edit schema.
  5. Select the column you want to edit (ss_customer_sk).
  6. Choose Edit.
  7. For Key, enter Classification.
  8. For Value, enter PII.
  9. Choose Save.

To verify that you can apply the added column properties, use the Lake Formation API to get the table description.

  1. On the Data Catalog Tables page, select store_sales.
  2. Choose View properties.

The table properties look like the following JSON object:

{
"Name": "store_sales",
"DatabaseName": "tcp-ds-1tb",
"Owner": "owner",
"CreateTime": "2019-09-13T10:15:04.000Z",
"UpdateTime": "2020-03-18T16:10:34.000Z",
"LastAccessTime": "2019-09-13T10:15:03.000Z",
"Retention": 0,
"StorageDescriptor": {
"Columns": [
{
"Name": "ss_sold_date_sk",
"Type": "bigint",
"Parameters": {}
},
...
{
"Name": "ss_customer_sk",
"Type": "bigint",
"Parameters": {
"Classification": "PII"
}
},
...
}

The additional column properties are now in the table metadata.

  1. Repeat the preceding steps for the customer table and label the following columns:
    • c_customer_sk
    • c_customer_id
    • c_first_name
    • c_last_name
    • c_login
    • c_email_address

Adding a tag also allows you to perform metadata searches by tag attributes. For more information, see Discovering metadata with AWS Lake Formation: Part 1 and Discover metadata with AWS Lake Formation: Part 2.

Anonymizing data with Athena

The data lake admin now needs to provide the data analyst anonymized datasets for analytics. For this use case, you want to extract patterns on the customer table and the store_sales table separately, but you also want to join the two tables so you can perform more sophisticated queries.

The first step is to create a database in Lake Formation to organize tables in AWS Glue.

  1. On the Lake Formation console, under Data Catalog, choose Databases.
  2. Choose Create database.
  3. For Name, enter a name, such as anonymised_tcp_ds_1tb.
  4. Optionally, enter an Amazon S3 path for the database and a description.
  5. Choose Create database.

The next step is to create the tables that contain the anonymized data. Before you do so, consider the significance of each anonymized column from an analytics point of view. For columns that have little or no value in the analytics process, omitting the column altogether might be the right approach. You might use other columns as primary keys to join with other tables. To make sure that you can join the tables, you can apply a hash function to the table foreign keys.

A common approach to anonymize sensitive information is hashing. A hash function is any function that you can use to map data of arbitrary size to fixed-size values. For more information, see Hash function.

The following table summarizes your strategy for each column.

TableColumn Strategy
customercustomer_first_namehash
customercustomer_last_namehash
customerc_loginomit
customercustomer_idhash
Customerc_email_addressomit
customerc_customer_skhash
store_salesss_customer_skhash

If you use the same value as the input of your hash function, it always returns the same result. In addition, and contrary to encryption, you can’t reverse hashing.

  1. Use Athena string functions to hash individual columns and generate anonymized datasets.
  2. After you create those datasets, you can use Lake Formation to apply security controls. See the following code:
CREATE table "tcp-ds-anonymized".customer
WITH (format='parquet',external_location = 's3://tcp-ds-eu-west-1-1tb-anonymised/2/customer_parquet/')
AS SELECT       
         sha256(to_utf8(cast(c_customer_sk AS varchar))) AS c_customer_sk_anonym,
         sha256(to_utf8(cast(c_customer_id AS varchar))) AS c_customer_id_anonym,
         sha256(to_utf8(cast(c_first_name AS varchar))) AS c_first_name_anonym,
         sha256(to_utf8(cast(c_last_name AS varchar))) AS c_last_name_anonym,
         c_current_cdemo_sk,
         c_current_hdemo_sk,
         c_first_shipto_date_sk,
         c_first_sales_date_sk,
         c_salutation,
         c_preferred_cust_flag,
         c_current_addr_sk,
         c_birth_day,
         c_birth_month,
         c_birth_year,
         c_birth_country,
         c_last_review_date_sk
FROM customer
  1. To preview the data, enter the following code:
SELECT c_first_name_anonym, c_last_name_anonym FROM "tcp-ds-anonymized"."customer" limit 10;

The following screenshot shows the output of your query.

  1. To repeat these steps for the stores_sales table, enter the following code:
CREATE table "tcp-ds-anonymized".store_sales
WITH (format='parquet',external_location = 's3://tcp-ds-eu-west-1-1tb-anonymised/1/store_sales/')
AS SELECT sha256(to_utf8(cast(ss_customer_sk AS varchar))) AS ss_customer_sk_anonym,
         ss_sold_date_sk,
         ss_sales_price,
         ss_sold_time_sk,
         ss_item_sk,
         ss_hdemo_sk,
         ss_addr_sk,
         ss_store_sk,
         ss_promo_sk,
         ss_ticket_number,
         ss_quantity,
         ss_wholesale_cost,
         ss_list_price,
         ss_ext_discount_amt,
         ss_external_sales_price,
         ss_ext_wholesale_cost,
         ss_ext_list_price,
         ss_ext_tax,
         ss_coupon_amt,
         ss_net_paid,
         ss_net_paid_inc_tax,
         ss_net_profit
FROM store_sales;

One of the challenges you need to overcome when working with CTAS queries is that the query’s Amazon S3 location should be unique for the table you’re creating. You can add some incremental value or timestamp to the path of the table, for example, s3:/<bucket>/<table_name>/<version>, and make sure you use a different version number every time.

You can delete older data programmatically using Amazon S3 APIs or SDK. You can also use Amazon S3 lifecycle configuration to tell Amazon S3 to transition objects to another Amazon S3 storage class. For more information, see Object lifecycle management.

You can automate the anonymization of the CTAS query with AWS Glue jobs. AWS Glue provides a lightweight Python shell job option that can call the Amazon Athena API programmatically.

Applying permissions with Lake Formation

Now that you have the table structures and anonymized datasets, you can apply the required permissions using Lake Formation.

  1. On the Lake Formation console, under Data Catalog, choose Tables.
  2. Select the tables that contain the anonymized data.
  3. From the Actions drop-down menu, under Permissions, choose Grant.
  4. For IAM users and roles, choose the IAM user for the data analyst.
  5. For Table permissions, select Select.
  6. Choose Grant.

You can now view all table permissions and verify the permissions granted to a particular principal.

Analyzing the anonymized datasets

To verify that the role can access the right tables and query the anonymized datasets, complete the following steps:

  1. Sign in to the AWS Management Console as the data analyst.
  2. Under Analytics, choose Amazon Athena.

You should see a query field, similar to the following screenshot.

You can now test your access with queries. To see the top customers by revenue and last name, enter the following code:

SELECT c_last_name_anonym,
sum(ss_sales_price) AS total_sales
FROM store_sales
JOIN customer
ON store_sales.ss_customer_sk_anonym = customer.c_customer_sk_anonym
GROUP BY c_last_name_anonym
ORDER BY total_sales DESC limit 10;

The following screenshot shows the query output.

You can also try to query a table that you don’t have access to. You should receive an error message.

Conclusion

Anonymizing dataset is often a prerequisite before users can start analyzing a dataset. In this post, we discussed how data lake admins can use Athena and Lake Formation to label and anonymize data stored in Amazon S3. You can then use Lake Formation to apply permissions to the dataset and allow other users to access the data.

The services we discussed in this post are serverless. Building serverless applications means that your developers can focus on their core product instead of worrying about managing and operating servers or runtimes, either in the cloud or on-premises. This reduced overhead lets developers reclaim time and energy that they can spend on developing great products that scale and that are reliable.

 


About the Author

Manos Samatas is a Specialist Solutions Architect in Big Data and Analytics with Amazon Web Services. Manos lives and works in London. He is specialising in architecting Big Data and Analytics solutions for Public Sector customers in EMEA region.

Build a distributed big data reconciliation engine using Amazon EMR and Amazon Athena

Post Syndicated from Sara Miller original https://aws.amazon.com/blogs/big-data/build-a-distributed-big-data-reconciliation-engine-using-amazon-emr-and-amazon-athena/

This is a guest post by Sara Miller, Head of Data Management and Data Lake, Direct Energy; and Zhouyi Liu, Senior AWS Developer, Direct Energy.

Enterprise companies like Direct Energy migrate on-premises data warehouses and services to AWS to achieve fully manageable digital transformation of their organization. Freedom from traditional data warehouse constraints frees up time and resources to focus on business innovation, and for Direct Energy, building data models that allow us to focus on customer satisfaction. These projects typically start by replicating data from source relational database systems (RDBS) into a single data repository, also known as a data lake. Before you can use this data to drive business insights, you should test it for quality and integrity. The quality of your data has a ripple effect on the insights you can extract and the trust people have in your insights.

Our post tackles data quality and integrity as we introduce a reconciliation tool that uses Amazon EMR and Amazon Athena to provide you with a cloud-native engine to validate vast amounts of data effectively at scale. The engine provides customized summary reports so you can interactively analyze the quality of your data. We also discuss the clarity that the engine has bought to the performance and quality of source ingestion and extract, transform, and load (ETL) processes.

About Direct Energy

Direct Energy is a North American retail energy and energy services provider and a subsidiary of the UK-based utility company, Centrica. Serving over four million residential and business customers, we strive to give you choice, simplicity, and innovation with simple solutions to track, understand, and control the electricity and natural gas you use.

Teams across Direct Energy came together to ensure a successful transition from on-premises SQL Server data warehouses to AWS data lakes, to include the following:

  1. Our Management Information Systems (MIS) team manages core AWS infrastructure, including AWS Identity and Access Management (IAM), Amazon EMR configurations, and more. The team also manages the ingestion of raw data directly from source data warehouses and billing systems housed on SQL Server, Oracle, and MySQL.
  2. Our Data Engineering team, embedded in business teams, picks up the raw source data after it lands in Amazon Simple Storage Service (Amazon S3), casts data types, merges data for select tables, and transforms the data to build out our business data warehouse, data marts, and datasets using PySpark on Amazon EMR. This includes the orchestration of dependencies across these steps.
  3. Completed data marts and datasets are made available to the business through Amazon Redshift and Microsoft Power BI for reporting and analytical modeling processes, such as churn propensity, customer lifetime value, price elasticity, and segmentation.

Due to the various layers and intricacies required to build out our finalized business data marts and datasets, testing is required at each step.

Why Direct Energy needed a reconciliation engine

The Data Engineering team at Direct Energy needed an automated way to check data quality at the row and field levels for data stored in Amazon S3 and those produced by AWS Glue and viewed through Athena against on-premises source systems.

Initially, our MIS team ran daily row counts against source and Athena and was held to a +/-99% SLA, accounting for potential timing issues. We quickly found this process had issues: this only checks that the row count matches between systems and doesn’t adequately profile data or measure comprehensive data quality. Furthermore, the target data on Amazon S3 should match exactly with the source system, but it’s less efficient to run row- and field-level checks on large volumes of data.

Beyond this, through the decommissioning of one of our major customer data warehouses, we rewrote over 350 SQL Server stored procedures in PySpark. Another layer of testing is required here to debug our code and to ensure our ETL processes are producing the same results that they previously produced from our on-premises server. This includes the testing of both staging and final tables.

Because we can’t do manual testing efficiently at this scale, we needed a solution that would enable automated testing of data quality at the row and field levels. We architected and developed a reconciliation engine called Pythagoras to randomly select a sample of records to check cell by cell. The tool runs new samples daily to ensure better coverage. This check validates whether individual values match between tables on Amazon S3 and in our source systems.

Pythagoras architecture

We use a config.yaml file to define the source databases, source tables, how to connect to source systems, reconciliation tasks, and other parameters required to specify the task. A sample ratio for the on-premises server tables is also defined here.

We use table_cols_mapping.json to define the column mapping between our on-premises server tables and the tables we expect to see when querying through Athena.

The reconciliation engine outputs a report, which is saved directly to Amazon S3 and can be viewed with Athena. Here, we can check results in SQL or PySpark or can export to Microsoft Excel.

The following diagram provides a walkthrough of the process. We use Apache Spark running on Amazon EMR to execute the reconciliation engine steps: preprocess, comparison, and postprocess.

Preprocess

The preprocessing component in the preceding architecture prepares data for reconciliation, and the output is used as an input in the comparison component. The preprocess step is responsible for several important functions:

  • The parse file config.yaml defines parameters such as connection strings for data sources, the log path, performance-tuning parameters, the location of reconciliation reports, and more. In addition, we add the reconciliation group and reconciliation task in the file to instruct Pythagoras to do the data integrity testing. This information tells the PySpark job where to fetch the data and the random sample ratio percentage to use in the reconciliation run. See the following code example:
    data_rec:
    - rec_task_1:
      - rec_task_1:
          sample_ratio: 10  #percent
          sourceA: fruit1
          sourceB: fruitA
          sourceA_database: database_sourceA
          sourceB_database: database_sourceB

  • We fetch data from the two data sources in this step.
  • The parse file tablecolmap.json maps the columns to be reconciled from the two data sources. For example, in the following JSON code, we see the value of column fruitA from data source B needs to be compared with the value of column fruit1 from data source A. The same logic applies for column animalB and animal2.
    "rec_task_1": {
      "cols_mapping_b_a": {
        "fruitA": "fruit1",
        "animalB": "animal2"
      }
    }

  • If we have a record from data source A, how can we find the corresponding record in target B? The answer is by using a key column. This key should exist in both sources and it should be unique. The component looks up the user-defined key in the joined_keys.py file to join the two datasets as a single data frame via the key and passes it to the comparison component.
  • Normalizing data types for both data sources also happens in this step if needed. For example, if the value of column x from source A is 1 stored as an integer, and the value of column x from source B is 1 stored as a string, these values are treated as different without any data type normalization; however, they are the same if we ignore the data type formatting.

Comparison

After fetching the output data frame from the preprocess pipeline, the comparison component is ready to perform the reconciliation. The following table shows an example schema of an input data frame.

idsouceA_animalsourceA_fruitsourceA_caranimalfruitcar
1catorangejeepcatorangejeep
2dogappletoyotadogpeartoyota

Columns starting with prefix sourceA_ indicate the data is from data source A. Columns titled animal, fruit, and car come from data source B. In this instance, the separated tabular data is linked and joined on a key id; the key should be uniquely defined in both data sources.

The engine reconciles values from columns in the preceding input data frame. The following table summarizes the output.

idsouceA_animalsourceA_fruitsourceA_caranimalfruitcarmatched_animalmatched_fruitmatched_cartotal_matched
1catorangejeepcatorangejeepTRUETRUETRUETRUE
2dogappletoyotadogpeartoyotaTRUEFALSETRUEFALSE

Columns with the prefix matched_ indicate the reconciled result of the corresponding cells. For instance, in the second record, column sourceA_fruit is apple and column fruit is pear, so the value of matched_fruit is FALSE.

Postprocess

Based on the detailed reconciled results, the engine generates various customized reports and saves those reports to a configured location on Amazon S3. The end-user can use SQL to query against Athena to quickly analyze the data, or can download the results to Microsoft Excel. In this section, we describe three of the reports we use daily.

The following table shows the reconciliation summary at the column level.

table_namecol_namenum_matched_truenum_matched_false
MyFavoritematched_animal20
MyFavoritematched_car20
MyFavoritematched_fruit11
MyFavoritetotal_matched11

This report is generated based on the information from the preceding output. Let’s assume the table is called MyFavorite. The engine counts the matched and unmatched cases over each column and provides an aggregated view of the number of records matching for each column.

In a production environment, the Data Engineering team needs to reconcile hundreds of tables in one batch. Pythagoras naturally supports wrapping these into one reconciliation task. In this case, we can generate another report to show the reconciliation summary per reconciliation task.

For example, in addition to the table MyFavorite, the reconciliation task has another table called MySkills, which needs to be reconciled. The following table shows its reconciliation summary at the column level.

table_namecol_namenum_matched_truenum_matched_false
MySkillsmatched_sport40
MySkillsmatched_instrument31
MyFavoritetotal_matched31

Base on the two preceding summaries, Pythagoras calculates high-level table statistics, as shown in the following table.

table_namedata_qualitytotal_match_rateissue_desc
MyFavoriteRed50%columns: fruit
MySkillsAmber75%columns: instrument

In the config.yml file, the user defines the parameter total_match_rate_threshold. For this use case, let’s say we define it as 50%. If the total_match_rate is greater than 50% and less than 100%, we label data_quality as Amber; if it is less than or equal than 50%, we label it as Red. The column issue_desc shows the list of unmatched columns (all columns with any unmatched values).

These reports help us assess overall data quality for all tables in one reconciliation task and quickly locate problem tables and columns. We use Athena to query the Pythagoras results using SQL. For example, in the following SQL query, we can filter ingested tables with data quality flagged as Red by Pythagoras (in other words, the MyFavorite table is returned):

SELECT  table_name  FROM “recon_summary” WHERE data_quality = ‘Red’

We also use Athena to query Pythagoras results for every step of our ETL process. We review a daily report that enables us to focus on which steps are having issues and the top columns that are experiencing a mismatch.

In the following output table, we would focus on steps 2, 3, and 9 first, because there are glaring issues and other steps may have dependencies on these, then come back to step 5 for some minor cleanup. Otherwise, anything above 99.9% is left alone, accounting for timing issues in our billing systems, for example.

table_namedata_qualitytotal_match_rateissue_desc
step1green0.9996608columns: col1: 200, col2: 93, col3: 12, col4: 10, col5:10
step2red0.82664197columns: col5: 30, col3: 22, col4: 16, col1: 15, col2: 14
step3red0.95370907columns: col1: 50, col2: 43, col3: 12, col4: 10, col5:1
step4green1
step5amber0.9987953columns: col1: 200, col2: 93, col3: 12, col4: 10, col5:14
step6green0.99992985columns: col1: 25
step7green0.99950025columns: col1: 200, col2: 93
step8green0.99993417columns: col1: 50
step9red0.24940514columns: col1: 19000, col2: 4293, col3: 1400, col4: 1000, col5:180
step10green0.99930817columns: col1: 573, col2: 420, col3: 120

We can also perform SQL queries in Athena to drill down further into each step to identify each column’s actual match rate for that particular sample. For example, the following table looks at step2:

table_namecol_namenum_match_truenum_match_false
step2col527730
step2col328522
step2col429116
step2col129215
step2col229314

For this use case, we want to look at col5 first because 30 records in the sample are unmatched, whereas 277 records are matched, then work our way down the list to clean up each column.

Conclusion

In this post, we discussed how Direct Energy uses a data reconciliation tool called Pythagoras to automate and test data quality checks at scale, using Amazon EMR for verify the data quality and Athena to analyze and report the results. Pythagoras brought significant clarity regarding the performance and quality of both Direct Energy source data ingestion and ETL processes, while eliminating the need for manual testing and enabling automated, randomized testing on a much greater scale.

Thus far, the ETL processes for two billing systems have been thoroughly vetted, resulting in 15% and 48% improvements in accuracy. We found that value mismatches are the most common data integrity issue in our data ingestion pipeline. Thanks to Pythagoras, we can quickly and precisely determine these mismatches in large datasets. Data engineering and platform teams then use the data Pythagoras provides to debug our ETL pipelines. After we adjust our pipelines, we run Pythagoras again to ensure the issue is fixed and stays fixed.

The implementation of this tool empowers Direct Energy to decommission widely used data platforms with precision and efficiency, and builds trust in our company’s data quality and integrity across the business.

 


About the Authors

Sara Miller is the Head of Data Management and Data Lake at Direct Energy, a subsidiary of Centrica. She has been with the organization for more than five years. As a versatile leader proficient in data engineering, mathematics, and statistics, Sara has helped organizations transform their reporting and analytics capabilities and has been instrumental in establishing various data science and analytics teams. She currently manages the end-to-end ETL pipeline for the North America residential portfolio, to include the transition from on-premise data warehousing to Amazon Web Services.

 

 

 

Zhouyi Liu is the Senior AWS Developer at Direct Energy, a subsidiary of Centrica. He focuses on the tech stack of Big Data, Machine Learning, AI. He currently works on the end-to-end ETL pipeline for the North America residential portfolio, to include the transition from on-premise data warehousing to Amazon Web Services. Outside of work, he also enjoys the roles of father, husband, and spending time with family.

 

 

 

How to retroactively encrypt existing objects in Amazon S3 using S3 Inventory, Amazon Athena, and S3 Batch Operations

Post Syndicated from Adam Kozdrowicz original https://aws.amazon.com/blogs/security/how-to-retroactively-encrypt-existing-objects-in-amazon-s3-using-s3-inventory-amazon-athena-and-s3-batch-operations/

Amazon Simple Storage Service (S3) is an object storage service that offers industry-leading scalability, performance, security, and data availability. With Amazon S3, you can choose from three different server-side encryption configurations when uploading objects:

  • SSE-S3 – uses Amazon S3-managed encryption keys
  • SSE-KMS – uses customer master keys (CMKs) stored in AWS Key Management Service (KMS)
  • SSE-C – uses master keys provided by the customer in each PUT or GET request

These options allow you to choose the right encryption method for the job. But as your organization evolves and new requirements arise, you might find that you need to change the encryption configuration for all objects. For example, you might be required to use SSE-KMS instead of SSE-S3 because you need more control over the lifecycle and permissions of the encryption keys in order to meet compliance goals.

You could change the settings on your buckets to use SSE-KMS rather than SSE-S3, but the switch only impacts newly uploaded objects, not objects that existed in the buckets before the change in encryption settings. Manually re-encrypting older objects under master keys in KMS may be time-prohibitive depending on how many objects there are. Automating this effort is possible using the right combination of features in AWS services.

In this post, I’ll show you how to use Amazon S3 Inventory, Amazon Athena, and Amazon S3 Batch Operations to provide insights on the encryption status of objects in S3 and to remediate incorrectly encrypted objects in a massively scalable, resilient, and cost-effective way. The solution uses a similar approach to the one mentioned in this blog post, but it has been designed with automation and multi-bucket scalability in mind. Tags are used to target individual noncompliant buckets in an account, and any encrypted (or unencrypted) object can be re-encrypted using SSE-S3 or SSE-KMS. Versioned buckets are also supported, and the solution operates on a regional level.

Note: You can’t re-encrypt to or from objects encrypted under SSE-C. This is because the master key material must be provided during the PUT or GET request, and cannot be provided as a parameter for S3 Batch Operations.

Moreover, the entire solution can be deployed in under 5 minutes using AWS CloudFormation. Simply tag your buckets targeted for encryption, upload the solution artifacts into S3, and deploy the artifact template through the CloudFormation console. In the following sections, you will see that the architecture has been built to be easy to use and operate, while at the same time containing a large number of customizable features for more advanced users.

Solution overview

At a high level, the core features of the architecture consist of 3 services interacting with one another: S3 Inventory reports (1) are delivered for targeted buckets, the report delivery events trigger an AWS Lambda function (2), and the Lambda function then executes S3 Batch (3) jobs using the reports as input to encrypt targeted buckets. Figure 1 below and the remainder of this section provide a more detailed look at what is happening underneath the surface. If this is not of high interest for you, feel free to skip ahead to the Prerequisites and Solution Deployment sections.

Figure 1: Solution architecture overview

Figure 1: Solution architecture overview

Here’s a detailed overview of how the solution works, as shown in Figure 1 above:

  1. When the CloudFormation template is first launched, a number of resources are created, including:
    • An S3 bucket to store the S3 Inventory reports
    • An S3 bucket to store S3 Batch Job completion reports
    • A CloudWatch event that is triggered by changes to tags on S3 buckets
    • An AWS Glue Database and AWS Glue Tables that can be used by Athena to query S3 Inventory and S3 Batch report findings
    • A Lambda function that is used as a Custom Resource during template launch, and afterwards as a target for S3 event notifications and CloudWatch events
  2. During deployment of the CloudFormation template, a Lambda-backed Custom Resource lists all S3 buckets within the AWS Region specified and checks to see if any has a configurable tag present (configured via an AWS CloudFormation parameter). When a bucket with the specified tag is discovered, the Lambda configures an S3 Inventory report for the discovered bucket to be delivered to the newly-created central report destination bucket.
  3. When a new S3 Inventory report arrives into the central report destination bucket (which can take between 1-2 days) from any of the tagged buckets, an S3 Event Notification triggers the Lambda to process it.
  4. The Lambda function first adds the path of the report CSV file as a partition to the AWS Glue table. This means that as each bucket delivers its report, it becomes instantly queryable by Athena, and any queries executed return the most recent information available on the status of the S3 buckets in the account.
  5. The Lambda function then checks the value of the EncryptBuckets parameter in the CloudFormation launch template to assess whether any re-encryption action should be taken. If it is set to yes, the Lambda function creates an S3 Batch job and executes it. The job takes each object listed in the manifest report and copies it over in the exact same location. When the copy occurs, SSE-KMS or SSE-S3 encryption is specified in the job parameters, effectively re-encrypting properly all identified objects.
  6. Once the batch job finishes for the S3 Inventory report, a completion report is sent to the central batch job report bucket. The CloudFormation template provides a parameter that controls the option to include either all successfully processed objects or only objects that were unsuccessfully processed. These reports can also be queried with Athena, since the reports are also added as partitions to the AWS Glue batch reports tables as they arrive.

Prerequisites

To follow along with the sample deployment, your AWS Identity and Access Management (IAM) principal (user or role) needs administrator access or equivalent.

Solution deployment

For this walkthrough, the solution will be configured to encrypt objects using SSE-KMS, rather than SSE-S3, when an inventory report is delivered for a bucket. Please note that the key policy of the KMS key will be automatically updated by the custom resource during launch to allow S3 to use it to encrypt inventory reports. No key policies are changed if SSE-S3 encryption is selected instead. The configuration in this walkthrough also adds a tag to all newly encrypted objects. You’ll learn how to use this tag to restrict access to unencrypted objects in versioned buckets. I’ll make callouts throughout the deployment guide for when you can choose a different configuration from what is deployed in this post.

To deploy the solution architecture and validate its functionality, you’ll perform five steps:

  1. Tag target buckets for encryption
  2. Deploy the CloudFormation template
  3. Validate delivery of S3 Inventory reports
  4. Confirm that reports are queryable with Athena
  5. Validate that objects are correctly encrypted

If you are only interested in deploying the solution and encrypting your existing environment, Steps 1 and 2 are all that are required to be completed. Steps 3 through 5 are optional on the other hand, and outline procedures that you would perform to validate the solution’s functionality. They are primarily for users who are looking to dive deep and take advantage of all of the features available.

With that being said, let’s get started with deploying the architecture!

Step 1: Tag target buckets

Navigate to the Amazon S3 console and identify which buckets should be targeted for inventorying and encryption. For each identified bucket, tag it with a designated key value pair by selecting Properties > Tags > Add tag. This demo uses the tag __Inventory: true and tags only one bucket called adams-lambda-functions, as shown in Figure 2.

Figure 2: Tagging a bucket targeted for encryption in Amazon S3

Figure 2: Tagging a bucket targeted for encryption in Amazon S3

Step 2: Deploy the CloudFormation template

  1. Download the S3 encryption solution. There will be two files that make up the backbone of the solution:
    • encrypt.py, which contains the Lambda microservices logic;
    • deploy.yml, which is the CloudFormation template that deploys the solution.
  2. Zip the file encrypt.py, rename it to encrypt.zip, and then upload it into any S3 bucket that is in the same Region as the one in which the CloudFormation template will be deployed. Your bucket should look like Figure 3:

    Figure 3: encrypt.zip uploaded into an S3 bucket

    Figure 3: encrypt.zip uploaded into an S3 bucket

  3. Navigate to the CloudFormation console and then create the CloudFormation stack using the deploy.yml template. For more information, see Getting Started with AWS CloudFormation in the CloudFormation User Guide. Figure 4 shows the parameters used to achieve the configuration specified for this walkthrough, with the fields outlined in red requiring input. You can choose your own configuration by altering the appropriate parameters if the ones specified do not fit your use case.

    Figure 4: Set the parameters in the CloudFormation stack

    Figure 4: Set the parameters in the CloudFormation stack

Step 3: Validate delivery of S3 Inventory reports

After you’ve successfully deployed the CloudFormation template, select any of your tagged S3 buckets and check that it now has an S3 Inventory report configuration. To do this, navigate to the S3 console, select a tagged bucket, select the Management tab, and then select Inventory, as shown in Figure 5. You should see that an inventory configuration exists. An inventory report will be delivered automatically to this bucket within 1 to 2 days, depending on the number of objects in the bucket. Make a note of the name of the bucket where the inventory report will be delivered. The bucket is given a semi-random name during creation through the CloudFormation template, so making a note of this will help you find the bucket more easily when you check for report delivery later.

Figure 5: Check that the tagged S3 bucket has an S3 Inventory report configuration

Figure 5: Check that the tagged S3 bucket has an S3 Inventory report configuration

Step 4: Confirm that reports are queryable with Athena

  1. After 1 to 2 days, navigate to the inventory reports destination bucket and confirm that reports have been delivered for buckets with the __Inventory: true tag. As shown in Figure 6, a report has been delivered for the adams-lambda-functions bucket.

    Figure 6: Confirm delivery of reports to the S3 reports destination bucket

    Figure 6: Confirm delivery of reports to the S3 reports destination bucket

  2. Next, navigate to the Athena console and select the AWS Glue database that contains the table holding the schema and partition locations for all of your reports. If you used the default values for the parameters when you launched the CloudFormation stack, the AWS Glue database will be named s3_inventory_database, and the table will be named s3_inventory_table. Run the following query in Athena:
    
    SELECT encryption_status, count(*) FROM s3_inventory_table GROUP BY encryption_status;
    

    The outputs of the query will be a snapshot aggregate count of objects in the categories of SSE-S3, SSE-C, SSE-KMS, or NOT-SSE across your tagged bucket environment, before encryption took place, as shown in Figure 7.

    Figure 7: Query results in Athena

    Figure 7: Query results in Athena

    From the query results, you can see that the adams-lambda-functions bucket had only two items in it, both of which were unencrypted. At this point, you can choose to perform any other analytics with Athena on the delivered inventory reports.

Step 5: Validate that objects are correctly encrypted

  1. Navigate to any of your target buckets in Amazon S3 and check the encryption status of a few sample objects by selecting the Properties tab of each object. The objects should now be encrypted using the specified KMS CMK. Because you set the AddTagToEncryptedObjects parameter to yes during the CloudFormation stack launch, these objects should also have the __ObjectEncrypted: true tag present. As an example, Figure 8 shows the rules_present_rule.zip object from the adams-lambda-functions bucket. This object has been properly encrypted using the correct KMS key, which has an alias of blog in this example, and it has been tagged with the specified key value pair.

    Figure 8: Checking the encryption status of an object in S3

    Figure 8: Checking the encryption status of an object in S3

  2. For further validation, navigate back to the Athena console and select the s3_batch_table from the s3_inventory_database, assuming that you left the default names unchanged. Then, run the following query:
    
    SELECT * FROM s3_batch_table;
    

    If encryption was successful, this query should result in zero items being returned because the solution by default only delivers S3 batch job completion reports on items that failed to copy. After validating by inspecting both the objects themselves and the batch completion reports, you can now safely say that the contents of the targeted S3 buckets are correctly encrypted.

Next steps

Congratulations! You’ve successfully deployed and operated a solution for rectifying S3 buckets with incorrectly encrypted and unencrypted objects. The architecture is massively scalable because it uses S3 Batch Operations and Lambda, it’s fully serverless, and it’s cost effective to run.

Please note that if you selected no for the EncryptBuckets parameter during the initial launch of the CloudFormation template, you can retroactively perform encryption on targeted buckets by simply doing a stack update. During the stack update, switch the EncryptBuckets parameter to yes, and proceed with deployment as normal. The update will reconfigure S3 inventory reports for all target S3 buckets to get the most up-to-date inventory. After the reports are delivered, encryption will proceed as desired.

Moreover, with the solution deployed, you can target new buckets for encryption just by adding the __Inventory: true tag. CloudWatch Events will register the tagging action and automatically configure an S3 Inventory report to be delivered for the newly tagged bucket.

Finally, now that your S3 buckets are properly encrypted, you should take a few more manual steps to help maintain your newfound account hygiene:

  • Perform remediation on unencrypted objects that may have failed to copy during the S3 Batch Operations job. The most common reason that objects fail to copy is when object size exceeds 5 GiB. S3 Batch Operations uses the standard CopyObject API call underneath the surface, but this API call can only handle objects less than 5 GiB in size. To successfully copy these objects, you can modify the solution you learned in this post to launch an S3 Batch Operations job that invokes Lambda functions. In the Lambda function logic, you can make CreateMultipartUpload API calls on objects that failed with a standard copy. The original batch job completion reports provide detail on exactly which objects failed to encrypt due to size.
  • Prohibit the retrieval of unencrypted object versions for buckets that had versioning enabled. When the object is copied over itself during the encryption process, the old unencrypted version of the object still exists. This is where the option in the solution to specify a tag on all newly encrypted objects becomes useful—you can now use that tag to draft a bucket policy that prohibits the retrieval of old unencrypted objects in your versioned buckets. For the solution that you deployed in this post, such a policy would look like this:
    
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect":     "Deny",
          "Action":     "s3:GetObject",
          "Resource":    "arn:aws:s3:::adams-lambda-functions/*",
          "Principal":   "*",
          "Condition": {  "StringNotEquals": {"s3:ExistingObjectTag/__ObjectEncrypted": "true" } }
        }
      ]
    }
    

  • Update bucket policies to prevent the upload of unencrypted or incorrectly encrypted objects. By updating bucket policies, you help ensure that in the future, newly uploaded objects will be correctly encrypted, which will help maintain account hygiene. The S3 encryption solution presented here is meant to be a onetime-use remediation tool, while you should view updating bucket policies as a preventative action. Proper use of bucket policies will help ensure that the S3 encryption solution is not needed again, unless another encryption requirement change occurs in the future. To learn more, see How to Prevent Uploads of Unencrypted Objects to Amazon S3.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the Amazon S3 forum.

Want more AWS Security how-to content, news, and feature announcements? Follow us on Twitter.

Author

Adam Kozdrowicz

Adam is a Data and Machine Learning Engineer for AWS Professional Services. He works closely with enterprise customers building big data applications on AWS, and he enjoys working with frameworks such as AWS Amplify, SAM, and CDK. During his free time, Adam likes to surf, travel, practice photography, and build machine learning models.

Enforce column-level authorization with Amazon QuickSight and AWS Lake Formation

Post Syndicated from Avijit Goswami original https://aws.amazon.com/blogs/big-data/enforce-column-level-authorization-with-amazon-quicksight-and-aws-lake-formation/

Amazon QuickSight is a fast, cloud-powered, business intelligence service that makes it easy to deliver insights and integrates seamlessly with your data lake built on Amazon Simple Storage Service (Amazon S3). QuickSight users in your organization often need access to only a subset of columns for compliance and security reasons. Without having a proper solution to enforce column-level security, you have to develop additional solutions, such as views, data masking, or encryption, to enforce security.

QuickSight accounts can now take advantage of AWS Lake Formation column-level authorization to enforce granular-level access control for their users.

Overview of solution

In this solution, you build an end-to-end data pipeline using Lake Formation to ingest data from an Amazon Aurora MySQL database to an Amazon S3 data lake and use Lake Formation to enforce column-level access control for QuickSight users.

The following diagram illustrates the architecture of this solution.

Walkthrough overview

The detailed steps in this solution include building a data lake using Lake Formation, which uses an Aurora MySQL database as the source and Amazon S3 as the target data lake storage. You create a workflow in Lake Formation that imports a single table from the source database to the data lake. You then use Lake Formation security features to enforce column-level security for QuickSight service on the imported table. Finally, you use QuickSight to connect to this data lake and visualize only the columns for which Lake Formation has given access to QuickSight user.

To implement the solution, you complete the following steps:

  1. Prerequisites
  2. Creating a source database
  3. Importing a single table from the source database
    • Creating a connection to the data source
    • Creating and registering your S3 bucket
    • Creating a database in the Data Catalog and granting permissions
    • Creating and running the workflow
    • Granting Data Catalog permissions
  4. Enforcing column-level security in Lake Formation
  5. Creating visualizations in QuickSight

Prerequisites

For this walkthrough, you should have the following prerequisites:

Creating a source database

In this step, create an Aurora MySQL database cluster and use the DDLs in the following GitHub repo to create an HR schema with associated tables and sample data.

You should then see the schema you created using the MySQL monitor or your preferred SQL client. For this post, I used SQL Workbench. See the following screenshot.

Record the Aurora database JDBC endpoint information; you need it in subsequent steps.

Importing a single table from the source database

Before you complete the following steps, make sure you have set up Lake Formation and met the JDBC prerequisites.

The Lake Formation setup creates a datalake_user IAM user. You need to add the same user as a QuickSight user. For instructions, see Managing User Access Inside Amazon QuickSight. For Role, choose AUTHOR.

Creating a connection to the data source

After you complete the Lake Formation prerequisites, which include creating IAM users datalake_admin and datalake_user, create a connection in your Aurora database. For instructions, see Create a Connection in AWS Glue. Provide the following information:

  • Connection name<yourPrefix>-blog-datasource
  • Connection type – JDBC
  • Database connection parameters – JDBC URL, user name, password, VPC, subnet, and security group

Creating and registering your S3 bucket

In this step, you create an S3 bucket named <yourPrefix>-blog-datalake, which you use as the root location of your data lake. After you create the bucket, you need to register the Amazon S3 path. Lastly, grant data location permissions.

Creating a database in the Data Catalog and granting permissions

Create a database in the Lake Formation Data Catalog named <yourPrefix>-blog-database, which stores the metadata tables. For instructions, see Create a Database in the Data Catalog.

After you create the database, you grant data permissions to the metadata tables to the LakeFormationWorkflowRole role, which you use to run the workflows.

Creating and running the workflow

In this step, you copy the EMPLOYEES table from the source database using a Lake Formation blueprint. Provide the following information:

  • Blueprint type – Database snapshot
  • Database connection<yourPrefix>-blog-datasource
  • Source data pathHR/EMPLOYEES
  • Target database<yourPrefix>-blog-database
  • Target storage location<yourPrefix>-blog-datalake
  • Workflow name<yourPrefix>-datalake-quicksight
  • IAM roleLakeFormationWorkflowRole
  • Table prefixblog

For instructions, see Use a Blueprint to Create a Workflow.

When the workflow is ready, you can start the workflow and check its status by choosing View graph. When the workflow is complete, you can see the employee table available in your Data Catalog under <yourPrefix>-blog-database. See the following screenshot.

You can also view the imported data using Athena, which is integrated with Lake Formation. You need to select “View Data” from “Actions” drop down menu for this purpose. See the following screenshot.

Granting Data Catalog permissions

In this step, you provide the Lake Formation Data Catalog access to the IAM user datalake_user. This is the same user that you added in QuickSight to create the dashboard. For Database permissions, select Create table and Alter for this use case, but you can change the permission level based on your specific requirements. For instructions, see Granting Data Catalog Permissions.

When this step is complete, you see the permissions for your database <yourPrefix>-blog-database.

Enforcing column-level security in Lake Formation

Now that your table is imported into the data lake, enforce column-level security to the dataset. For this use case, you want to hide the Salary and Phone_Number columns from business intelligence QuickSight users.

  1. In the Lake Formation Data Catalog, choose Databases.
  2. From the list of databases, choose <yourPrefix>-blog-database.
  3. Choose View tables.
  4. Select blog_hr_employees.
  5. From the Actions drop-down menu, choose Grant.

  1. For Active Directory and Amazon QuickSight users and groups, provide the QuickSight user ARN.

You can find the ARN by entering the code aws quicksight list-users --aws-account-id <your AWS account id> --namespace default --region us-east-1 in the AWS Command Line Interface (AWS CLI).

  1. For Database, choose <yourPrefix>-blog-database.
  2. For Table, choose blog_hr_employees.
  3. For Columns, choose Exclude columns.
  4. For Exclude columns, choose salary and phone_number.
  5. For Table permissions, select Select.

You should receive a confirmation on the console that says Permission granted for: datalake_user to Exclude: <yourPrefix>-blog-database.blog_hr_employees.[salary, phone_number].

You can also verify that appropriate permission is reflected for the QuickSight user on the Lake Formation console by navigating to the Permissions tab and filtering for your database and table.

You can also specify column-level permissions in the AWS CLI with the following code:

aws lakeformation grant-permissions --principal DataLakePrincipalIdentifier=<QuickSight User ARN> --permissions "SELECT" --resource '{ "TableWithColumns": {"DatabaseName":"<yourPrefix>-blog-database", "Name":"blog_hr_employees", "ColumnWildcard": {"ExcludedColumnNames": ["salary", "phone_number"]}}}'  --region us-west-2 --profile datalake_admin

Creating visualizations in QuickSight

In this step, you use QuickSight to access the blog_hr_employees table in your data lake. While accessing this dataset from QuickSight, you can see that QuickSight doesn’t show the salary and phone_number columns, which you excluded from the source table in the previous step.

  1. Log in to QuickSight using the datalake_user IAM user.
  2. Choose New analysis.
  3. Choose New dataset.
  4. For the data source, choose Athena.

  1. For your data source name, enter Athena-HRDB.
  2. For Database, choose <yourPrefix>-blog-database.
  3. For Tables, select blog_hr_employees.
  4. Choose Select.

  1. Choose Import to SPICE for quicker analysis or Directly query your data.

For this use case, choose Import to SPICE. This provides faster visualization in a production setup, and you can run a scheduled refresh to make sure your dashboards are referring to the current data. For more information, see Scheduled Refresh for SPICE Data Sets on Amazon QuickSight.

When you complete the previous steps, your data is imported to your SPICE machine and you arrive at the QuickSight visualization dashboard. You can see that SPICE has excluded the salary and phone_number fields from the table. In the following screenshot, we created a pie chart visualization to show how many employees are present in each department.

Cleaning up

To avoid incurring future charges, delete the resources you created in this walkthrough, including your S3 bucket, Aurora cluster, and other associated resources.

Conclusion

Restricting access to sensitive data to various users in a data lake is a very common challenge. In this post, we demonstrated how to use Lake Formation to enforce column-level access to QuickSight dashboard users. You can enhance security further with Athena workgroups. For more information, see Creating a Data Set Using Amazon Athena Data and Benefits of Using Workgroups.

 


About the Author

Avijit Goswami is a Sr. Startups Solutions Architect at AWS, helping startup customers become tomorrow’s enterprises. When not at work, Avijit likes to cook, travel, watch sports, and listen to music.

 

 

How Wind Mobility built a serverless data architecture

Post Syndicated from Pablo Giner original https://aws.amazon.com/blogs/big-data/how-wind-mobility-built-a-serverless-data-architecture/

Guest post by Pablo Giner, Head of BI, Wind Mobility.

Over the past few years, urban micro-mobility has become a trending topic. With the contamination indexes hitting historic highs, cities and companies worldwide have been introducing regulations and working on a wide spectrum of solutions to alleviate the situation.

We at Wind Mobility strive to make commuters’ life more sustainable and convenient by bringing short distance urban transportation to cities worldwide.

At Wind Mobility, we scale our services at the same pace as our users demand them, and we do it in an economically and environmentally viable way. We optimize our fleet distribution to avoid overcrowding cities with more scooters than those that are actually going to be used, and we position them just meters away from where our users need them and at the time of the day when they want them.

How do we do that? By optimizing our operations to their fullest. To do so, we need to be very well informed about our users’ behavior under varying conditions and understand our fleet’s potential.

Scalability and flexibility for rapid growth

We knew that before we could solve this challenge, we needed to collect data from many different sources, such as user interactions with our application, user demand, IoT signals from our scooters, and operational metrics. To analyze the numerous datasets collected and extract actionable insights, we needed to build a data lake. While the high-level goal was clear, the scope was less so. We were working hard to scale our operation as we continued to launch new markets. The rapid growth and expansion made it very difficult to predict the volume of data we would need to consume. We were also launching new microservices to support our growth, which resulted in more data sources to ingest. We needed an architecture that allowed us to be agile and quickly adopt to meet our growth. It became clear that a serverless architecture was best positioned to meet those needs, so we started to design our 100% serverless infrastructure.

The first challenge was ingesting and storing data from our scooters in the field, events from our mobile app, operational metrics, and partner APIs. We use AWS Lambda to capture changes in our operational databases and mobile app and push the events to Amazon Kinesis Data Streams, which allows us to take action in real time. We also use Amazon Kinesis Data Firehose to write the data to Amazon Simple Storage Service (Amazon S3), which we use for analytics.

After we were in Amazon S3 and adequately partitioned as per its most common use cases (we partition by date, region, and business line, depending on the data source), we had to find a way to query this data for both data profiling (understanding structure, content, and interrelationships) and ad hoc analysis. For that we chose AWS Glue crawlers to catalog our data and Amazon Athena to read from the AWS Glue Data Catalog and run queries. However, ad hoc analysis and data profiling are relatively sporadic tasks in our team, because most of the data processing computing hours are actually dedicated to transforming the multiple data sources into our data warehouse, consolidating the raw data, modeling it, adding new attributes, and picking the data elements, which constitute 95% of our analytics and predictive needs.

This is where all the heavy lifting takes place. We parse through millions of scooter and user events generated daily (over 300 events per second) to extract actionable insight. We selected AWS Glue to perform this task. Our primary ETL job reads the newly added raw event data from Amazon S3, processes it using Apache Spark, and writes the results to our Amazon Redshift data warehouse. AWS Glue plays a critical role in our ability to scale on demand. After careful evaluation and testing, we concluded that AWS Glue ETL jobs meet all our needs and free us from procuring and managing infrastructure.

Architecture overview

The following diagram represents our current data architecture, showing two serverless data collection, processing, and reporting pipelines:

  • Operational databases from Amazon Relational Database Service (Amazon RDS) and MongoDB
  • IoT and application events, followed by Athena for data profiling and Amazon Redshift for reporting

Our data is curated and transformed multiple times a day using an automated pipeline running on AWS Glue. The team can now focus on analyzing the data and building machine learning (ML) applications.

We chose Amazon QuickSight as our business intelligence tool to help us visualize and better understand our operational KPIs. Additionally, we use Amazon Elastic Container Registry (Amazon ECR) to store our Docker images containing our custom ML algorithms and Amazon Elastic Container Service (Amazon ECS) where we train, evaluate, and host our ML models. We schedule our models to be trained and evaluated multiple times a day. Taking as input curated data about demand, conversion, and flow of scooters, we run the models to help us optimize fleet utilization for a particular city at any given time.

The following diagram represents how data from the data lake is incorporated into our ML training, testing, and serving system. First, our developers work in the application code and commit their changes, which are built into new Docker images by our CI/CD pipeline and stored in the Amazon ECR registry. These images are pushed into Amazon ECS and tested in DEV and UAT environments before moving to PROD (where they are triggered by the Amazon ECS task scheduler). During their execution, the Amazon ECS tasks (some train the demand and usage forecasting models, some produce the daily and hourly predictions, and others optimize the fleet distribution to satisfy the forecast) read their configuration and pull data from Amazon S3 (which has been previously produced by scheduled AWS Glue jobs), finally storing their results back into Amazon S3. Executions of these pipelines are tracked via MLFlow (in a dedicated Amazon Elastic Compute Cloud (Amazon EC2) server) and the final result indicating the fleet operations required is fit into a Kepler map, which is then consumed by the operators on the field.

Conclusion

We at Wind Mobility place data at the forefront of our operations. For that, we need our data infrastructure to be as flexible as the industry and the context we operate in, which is why we chose serverless. Over the course of a year, we have built a data lake, a data warehouse, a BI suite, and a variety of (production) data science applications. All of that with a very small team.

Also, within the last 12 months, we have scaled up several of our data pipelines by a factor of 10, without slowing our momentum or redesigning any part of our architecture. When it came to double our fleet in 1 week and increase the frequency at which we capture data from scooters by a factor of 10, our serverless data architecture scaled with no issues. This allowed us to focus on adding value by simplifying our operation, reacting to changes quickly, and delighting our users.

We have measured our success in multiple dimensions:

  • Speed – Serverless is faster to deploy and expand; we believe we have reduced our time to market for the entire infrastructure by a factor of 2
  • Visibility – We have 360 degree visibility of our operations worldwide, accessible by our city managers, finance team, and management board
  • Optimized fleet deployment – We know, at any minute of the day, the number of scooters that our customers need over the next few hours, which reduces unsatisfied demand by more than 50%

If you face a similar challenge, our advice is clear: go fully serverless and use the spectrum of solutions available from AWS.

Follow us and discover more about Wind Mobility on Facebook, Instagram and LinkedIn.

 


About the Author

Pablo Giner is Head of BI at Wind Mobility. Pablo’s background is in wheels (motorcycle racing > vehicle engineering > collision insurance > eScooters sharing…) and for the last few years he has specialized in forming and developing data teams. At Wind Mobility, he leads the data function (data engineering + analytics + data science), and the project he is most proud of is what they call smart fleet rebalancing, an AI backed solution to reposition their fleet in real-time. “In God we trust. All others must bring data.” – W. Edward Deming

 

 

 

Process data with varying data ingestion frequencies using AWS Glue job bookmarks

Post Syndicated from Dipankar Kushari original https://aws.amazon.com/blogs/big-data/process-data-with-varying-data-ingestion-frequencies-using-aws-glue-job-bookmarks/

We often have data processing requirements in which we need to merge multiple datasets with varying data ingestion frequencies. Some of these datasets are ingested one time in full, received infrequently, and always used in their entirety, whereas other datasets are incremental, received at certain intervals, and joined with the full datasets to generate output. To address this requirement, this post demonstrates how to build an extract, transform, and load (ETL) pipeline using AWS Glue.

Using AWS Glue

AWS Glue provides a serverless environment to extract, transform, and load a large number of datasets from several sources for analytics purposes. It has a feature called job bookmarks to process incremental data when rerunning a job on a scheduled interval. A job bookmark is composed of the states for various job elements, such as sources, transformations, and targets. This is done by persisting state information from a job run that helps AWS Glue prevent reprocessing old data.

For this use case, we use an AWS Glue job with job bookmarks enabled to process files received in varying frequencies (a full dataset signifying files that are received one time, and incremental datasets signifying files that are received in certain regular intervals). These files are merged together. In addition to enabling job bookmarks, we also use an optional parameter transformation_ctx (transformation context) in an AWS Glue PySpark dynamic frame. This acts as a unique identifier for the ETL operator instance to identify state information within a job bookmark for a given operator. AWS Glue uses transformation_ctx to index the key to the bookmark state.

You can capture and maintain state information for incremental datasets and avoid reprocessing by using transformation context. Transformation context is omitted for the full dataset file, which results in the job run state information not getting captured for the full dataset and allowing it to participate in the next processing event in its entirety. Even though the job bookmark flag is enabled at the AWS Glue job level, because transformation context is omitted for the full dataset, every time the job runs, the entire data from the full dataset is used as part of the job. In contrast, only the newly arrived datasets are processed for the incremental datasets.

Solution overview

To demonstrate the job bookmark utility of AWS Glue, we use TLC Trip Record Data datasets. We use NYC yellow taxi trip data monthly files as the incremental dataset, and NYC taxi zone lookup as the full dataset. The monthly yellow taxi trip data has a field named PULocationID (where a customer was picked up), which is joined with the LocationID field from the NYC taxi zone lookup file to create an output dataset that contains Borough, Zone, and service_zone from the NYC taxi zone lookup dataset and all the fields (except the PULocationID field) from the monthly NYC taxi trip data file.

The following diagram depicts a high-level architecture of the process.

Descriptions of Diagram

  • Two Amazon S3 Raw bucket locations are used for storing incoming CSV source data (NYC taxi monthly files (Incremental Dataset) and NYC Taxi lookup file (Full Dataset)).
  • A Bookmark enabled glue Job joins data between monthly trip data file and the taxi zone lookup file to generate output parquet files and creates NYC taxi trip table in Glue Data Catalog and Redshift database.
  • S3 Curated Bucket is used to store NYC Taxi monthly processed parquet files.

Creating the AWS CloudFormation stack

You use the following AWS CloudFormation template to create the below mentioned resources in your preferred AWS account and Region:

Additionally, make sure you have an Amazon EC2 key pair created in the account and Region you’re launching the stack from.

To provide the stack parameters, complete the following steps:

  1. For Stack name, enter BigDataBlog-GlueBookmark-Stack.

  1. For RedshiftClusterIdentifier, enter bigdatablogrscluster.
  2. For NodeType, choose large.
  3. For NumberOfNodes, choose 2.
  4. For DatabaseName, enter bigdatablogdev.

  1. For MasterUserName, enter bigdatabloguser.
  2. For MasterUserPassword, enter a password for the master user account.
  3. For Maintenancewindow, enter sun:05:00-sun:05:30.
  4. For EC2InstanceType, choose micro.
  5. For SubscriptionEmail, enter your preferred email.
  6. For MyIPAddressCidr, enter your IP address.

You can find your IP Address by browsing https://www.whatismyip.com/ and looking up the value for My Public IPv4 is:. Add /32 at the end to make it CIDR-compatible and most restrictive.

  1. For DestinationPrefixListId, enter your prefix list ID.

To find your ID, set AWS credentials by entering aws configure in the command prompt. Run aws ec2 describe-prefix-lists to get the PrefixListId where PrefixListName is com.amazonaws.<<AWS region>>.s3 from the output.

  1. For NewS3BucketName, enter the name of your S3 bucket.

  1. For gluedatabase, enter bigdatabloggluedb.
  2. For EC2KeyName, enter the name of your key pair.

For instructions on creating a stack, see Creating a Stack on the AWS CloudFormation Console.

Make sure the stack is complete before moving to the next steps.

Creating the AWS Glue job

To create your AWS Glue job, complete the following steps:

  1. Download NYC yellow monthly trip data for October 2019 and November 2019 and save them under the s3://<<Your S3 Bucket>>/tripdata/ prefix.
  2. Download the NYC Taxi Zone lookup table and save it under the s3://<<Your S3 Bucket>>/tripdata-lookup/ prefix.
  3. Use the following PySpark script and change the piece of the code enclosed inside <<…>>.

You can find the values for the following keys on the Outputs tab for the CloudFormation stack:

    • S3Bucket
    • Snstopic

You can find the values for the following keys on the Parameters tab for the CloudFormation stack:

    • EC2KeyName
    • MyIPAddressCidr
    • NewS3BucketName
    • SubscriptionEmail

  1. When the AWS Glue script is ready, upload it to the S3 bucket under the s3://<<Your S3 Bucket>>/glue-script/ prefix.

You refer to this when you create the AWS Glue job.

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Create job.
  3. For Name, enter a name for the job. For more information about AWS Glue job naming, see Jobs.
  4. For IAM role, choose the role the CloudFormation template created. Use the value for the key Glueaccessrole from the stack outputs.
  5. For Type, choose Spark.
  6. For Glue version, choose Spark 2.4, Python 3 (Glue Version 1.0).
  7. For This job runs, choose An existing script that you provide.
  8. For S3 path where the script is stored, choose the script file that you saved earlier under the s3://<<Your S3 Bucket>>/Glue-script/ prefix.
  9. In the Advanced properties section, for Job bookmark, choose Enable.
  10. For Catalog options, select Use Glue Data Catalog as the Hive metastore.
  11. For Connections, enter the value of the key GlueConnection from the stack outputs.
  12. Choose Save job and edit script.

Creating an Amazon Redshift database schema

Before you run the AWS Glue job, you need to connect to the Amazon Redshift cluster and create an Amazon Redshift database schema named Glue_bookmark_redshift_schema. To connect to the cluster, use one of the JDBC client-based SQL tools, such as SQL Workbench/J. For instructions, see How can I access a private Amazon Redshift cluster from my local machine?

To access the cluster, you use the Amazon Redshift master user bigdatabloguser (the value for MasterUserName on the Parameters tab of the CloudFormation stack) and the password you provided when creating the stack.

Running AWS Glue job

The Glue Job takes only one argument; name of the file being processed. Pass the file name, such as yellow_tripdata_2019-10.csv, while processing that file. This enables you to track the records that belong to a specific file so that it’s easier to evaluate the result of multiple job runs using different files.

When the Glue job run is successful, you can see the output Parquet files under the /tripdata-joined-output/ prefix inside the S3 bucket you created by running the CloudFormation template. You can also use Amazon Athena to query the data from the table created in the Data Catalog. For more information, see Running SQL Queries Using Amazon Athena.

Query the Amazon Redshift database table named redshift_bookmark_table and review the output.

Explaining the solution

A bookmark-enabled AWS Glue job (in PySpark) is created that reads the NYC yellow taxi trip’s monthly file, joins it with NYC taxi zone lookup file, produces files in Parquet format, and saves them in an Amazon s3 location.

A Data Catalog table is created that refers to the Parquet files’ location in Amazon S3. The resulting dataset is also loaded into an Amazon Redshift table using the AWS Glue PySpark job.

The AWS Glue job bookmark transformation context is used while the AWS Glue dynamic frame is created by reading a monthly NYC taxi file, whereas the transformation context is disabled while reading and creating the dynamic frame for the taxi zone lookup file (because the entire file is required for processing each monthly trip file). This allows you to process each monthly trip file exactly one time and reuse the entire taxi zone lookup file as many times as required because the missing transformation context for the lookup file doesn’t allow the bookmark context to be set for that file.

When a new NYC trip data monthly file arrives and the AWS Glue job runs, it only processes the newly arrived monthly file and ignores any previously processed monthly files. Similarly, when the Data Catalog table data is copied into Amazon Redshift, it only copies the newly processed underlying Parquet files’ data and appends it to the Amazon Redshift table. At this time the transformation context is enabled to utilize the job bookmark, and the AWS Glue dynamic frame is created by reading the Data Catalog table.

The following PySpark code uses the transformation context to create an AWS Glue dynamic frame while reading the monthly incremental file:

taxidata = GlueContext.create_dynamic_frame_from_options(connection_type="s3",connection_options = {"paths": [InputDir]},format="csv",format_options={"withHeader": True,"separator": ",","quoteChar": '"',"escaper": '"'},transformation_ctx = "taxidata")

However, the following code omits transformation context when creating the AWS Glue dynamic frame for the lookup file:

Lookupdata  = GlueContext.create_dynamic_frame_from_options(connection_type="s3",connection_options = {"paths":[InputLookupDir]},format="csv",format_options={"withHeader": True,"separator": ",","quoteChar": '"',"escaper": '"'})

Additionally, the following code uses the transformation context while reading the Data Catalog table, which is loaded into an Amazon Redshift table:

datasource0 = GlueContext.create_dynamic_frame.from_catalog(database = Glue_catalog_database, table_name = Glue_table_name, transformation_ctx = "datasource0")

You can see in the screenshot below that the 2019 October yellow taxi trip data file has arrived for processing (the incremental dataset).

To process each month’s data, you need the taxi zone lookup (full dataset).

The following screenshot shows the output of the AWS Glue job after processing the 2019 October trip data, saved in Parquet format.

The following two screenshots show the Amazon Redshift table, displaying the count of records for the October 2019 taxi data and only October 2019 taxi data file has been processed so far, respectively

The following screenshot shows that the November 2019 NYC taxi data file has arrived for processing.

The following screenshot shows the output of the AWS Glue job after processing the 2019 November trip data, saved in Parquet format. The job only processed the November data and ignored the October data (to be reprocessed) because the job bookmark and transformation context was enabled.

The following screenshot shows that the Amazon Redshift table now has both October and November data and shows the total record count.

The following screenshot shows individual record count for each month.

Querying with Athena

You can also review the dataset in Athena, which uses the same Glue Data Catalog. The following screenshot of an Athena query shows the Data Catalog table has both October and November data, with the total record count.

The following screenshot of an Athena query shows the individual record count for each month.

The following screenshot shows the location information, including borough, zone, and service zone, which is available in the taxi zone lookup and is joined with the October taxi trip data.

The following screenshot shows the output for the same query on the November data.

Cleaning up

When you’re done using this solution, you should delete the CloudFormation stack to avoid incurring any further charges.

Conclusion

This post describes how you can merge datasets received in different frequencies as part of your ETL pipeline processing using AWS Glue job bookmarks. The use case demonstrated how to use job bookmarks and transformation context to build an ETL pipeline for processing several incremental datasets.

 


About the Authors

Dipankar is a Senior Data Architect with AWS Professional Services, helping customers build analytics platform and solutions. He has a keen interest in distributed computing. Dipankar enjoys spending time playing chess and watching old Hollywood movies.

 

 

 

Ashok Padmanabhan is a big data consultant with AWS Professional Services, helping customers build big data and analytics platform and solutions. When not building and designing data lakes, Ashok enjoys spending time at beaches near his home in Florida.

Moovit embraces data lake architecture by extending their Amazon Redshift cluster to analyze billions of data points every day

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/moovit-embraces-data-lake-architecture-by-extending-their-amazon-redshift-cluster-to-analyze-billions-of-data-points-every-day/

Amazon Redshift is a fast, fully managed, cloud-native data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing business intelligence tools.

Moovit is a leading Mobility as a Service (MaaS) solutions provider and maker of the top urban mobility app. Guiding over 800 million users in more than 3,200 cities across 103 countries to get around town effectively and conveniently, Moovit has experienced exponential growth of their service in the last few years. The company amasses up to 6 billion anonymous data points a day to add to the world’s largest repository of transit and urban mobility data, aided by Moovit’s network of more than 685,000 local editors that help map and maintain local transit information in cities that would otherwise be unserved.

Like Moovit, many companies today are using Amazon Redshift to analyze data and perform various transformations on the data. However, as data continues to grow and become even more important, companies are looking for more ways to extract valuable insights from the data, such as big data analytics, numerous machine learning (ML) applications, and a range of tools to drive new use cases and business processes. Companies are looking to access all their data, all the time, by all users and get fast answers. The best solution for all those requirements is for companies to build a data lake, which is a centralized repository that allows you to store all your structured, semi-structured, and unstructured data at any scale.

With a data lake built on Amazon Simple Storage Service (Amazon S3), you can easily run big data analytics using services such as Amazon EMR and AWS Glue. You can also query structured data (such as CSV, Avro, and Parquet) and semi-structured data (such as JSON and XML) by using Amazon Athena and Amazon Redshift Spectrum. You can also use a data lake with ML services such as Amazon SageMaker to gain insights.

Moovit uses an Amazon Redshift cluster to allow different company teams to analyze vast amounts of data. They wanted a way to extend the collected data into the data lake and allow additional analytical teams to access more data to explore new ideas and business cases.

Additionally, Moovit was looking to manage their storage costs and evolve to a model that allowed cooler data to be maintained at the lowest cost in S3, and maintain the hottest data in Redshift for the most efficient query performance. The proposed solution implemented a hot/cold storage pattern using Amazon Redshift Spectrum and reduced the local disk utilization on the Amazon Redshift cluster to make sure costs are maintained. Moovit is currently evaluating the new RA3 node with managed storage as an additional level of flexibility that will allow them to easily scale the amount of hot/cold storage without limit.

In this post we demonstrate how Moovit, with the support of AWS, implemented a lake house architecture by employing the following best practices:

  • Unloading data into Amazon Simple Storage Service (Amazon S3)
  • Instituting a hot/cold pattern using Amazon Redshift Spectrum
  • Using AWS Glue to crawl and catalog the data
  • Querying data using Athena

Solution overview

The following diagram illustrates the solution architecture.

The solution includes the following steps:

  1. Unload data from Amazon Redshift to Amazon S3
  2. Create an AWS Glue Data Catalog using an AWS Glue crawler
  3. Query the data lake in Amazon Athena
  4. Query Amazon Redshift and the data lake with Amazon Redshift Spectrum

Prerequisites

To complete this walkthrough, you must have the following prerequisites:

  1. An AWS account.
  2. An Amazon Redshift cluster.
  3. The following AWS services and access: Amazon Redshift, Amazon S3, AWS Glue, and Athena.
  4. The appropriate AWS Identity and Access Management (IAM) permissions for Amazon Redshift Spectrum and AWS Glue to access Amazon S3 buckets. For more information, see IAM policies for Amazon Redshift Spectrum and Setting up IAM Permissions for AWS Glue.

Walkthrough

To demonstrate the process Moovit used during their data architecture, we use the industry-standard TPC-H dataset provided publicly by the TPC organization.

The Orders table has the following columns:

ColumnType
O_ORDERKEYint4
O_CUSTKEYint4
O_ORDERSTATUSvarchar
O_TOTALPRICEnumeric
O_ORDERDATEdate
O_ORDERPRIORITYvarchar
O_CLERKvarchar
O_SHIPPRIORITYint4
O_COMMENTvarchar
SKIPvarchar

Unloading data from Amazon Redshift to Amazon S3

Amazon Redshift allows you to unload your data using a data lake export to an Apache Parquet file format. Parquet is an efficient open columnar storage format for analytics. Parquet format is up to twice as fast to unload and consumes up to six times less storage in Amazon S3, compared with text formats.

To unload cold or historical data from Amazon Redshift to Amazon S3, you need to run an UNLOAD statement similar to the following code (substitute your IAM role ARN):

UNLOAD ('select o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, skip
FROM tpc.orders
ORDER BY o_orderkey, o_orderdate') 
TO 's3://tpc-bucket/orders/' 
CREDENTIALS 'aws_iam_role=arn:aws:iam::<account_number>:role/>Role<'
FORMAT AS parquet allowoverwrite PARTITION BY (o_orderdate);

It is important to define a partition key or column that minimizes Amazon S3 scans as much as possible based on the query patterns intended. The query pattern is often by date ranges; for this use case, use the o_orderdate field as the partition key.

Another important recommendation when unloading is to have file sizes between 128 MB and 512 MB. By default, the UNLOAD command splits the results to one or more files per node slice (virtual worker in the Amazon Redshift cluster) which allows you to use the Amazon Redshift MPP architecture. However, this can potentially cause files created by every slice to be small. In Moovit’s use case, the default UNLOAD using PARALLEL ON yielded dozens of small (MBs) files. For Moovit, PARALLEL OFF yielded the best results because it aggregated all the slices’ work into the LEADER node and wrote it out as a single stream controlling the file size using the MAXFILESIZE option.

Another performance enhancement applied in this use case was the use of Parquet’s min and max statistics. Parquet files have min_value and max_value column statistics for each row group that allow Amazon Redshift Spectrum to prune (skip) row groups that are out of scope for a query (range-restricted scan). To use row group pruning, you should sort the data by frequently-used columns. Min/max pruning helps scan less data from Amazon S3, which results in improved performance and reduced cost.

After unloading the data to your data lake, you can view your Parquet file’s content in Amazon S3 (assuming it’s under 128 MB). From the Actions drop-down menu, choose Select from.

You’re now ready to populate your Data Catalog using an AWS Glue crawler.

Creating a Data Catalog with an AWS Glue crawler

To query your data lake using Athena, you must catalog the data. The Data Catalog is an index of the location, schema, and runtime metrics of the data.

An AWS Glue crawler accesses your data store, extracts metadata (such as field types), and creates a table schema in the Data Catalog. For instructions, see Working with Crawlers on the AWS Glue Console.

Querying the data lake in Athena

After you create the crawler, you can view the schema and tables in AWS Glue and Athena, and can immediately start querying the data in Athena. The following screenshot shows the table in the Athena Query Editor.

Querying Amazon Redshift and the data lake using a unified view with Amazon Redshift Spectrum

Amazon Redshift Spectrum is a feature of Amazon Redshift that allows multiple Redshift clusters to query from same data in the lake. It enables the lake house architecture and allows data warehouse queries to reference data in the data lake as they would any other table. Amazon Redshift clusters transparently use the Amazon Redshift Spectrum feature when the SQL query references an external table stored in Amazon S3. Large multiple queries in parallel are possible by using Amazon Redshift Spectrum on external tables to scan, filter, aggregate, and return rows from Amazon S3 back to the Amazon Redshift cluster.

Following best practices, Moovit decided to persist all their data in their Amazon S3 data lake and only store hot data in Amazon Redshift. They could query both hot and cold datasets in a single query with Amazon Redshift Spectrum.

The first step is creating an external schema in Amazon Redshift that maps a database in the Data Catalog. See the following code:

CREATE EXTERNAL SCHEMA spectrum 
FROM data catalog 
DATABASE 'datalake' 
iam_role 'arn:aws:iam::<account_number>:role/mySpectrumRole'
CREATE EXTERNAL DATABASE IF NOT EXISTS;

After the crawler creates the external table, you can start querying in Amazon Redshift using the mapped schema that you created earlier. See the following code:

SELECT * FROM spectrum.orders;

Lastly, create a late binding view that unions the hot and cold data:

CREATE OR REPLACE VIEW lake_house_joint_view AS (SELECT * FROM public.orders WHERE o_orderdate >= dateadd(‘day’,-90,date_trunc(‘day’,getdate())) 
UNION ALL SELECT * FROM spectrum.orders WHERE o_orderdate < dateadd(‘day’,-90,date_trunc(‘day’,getdate())) WITH NO SCHEMA BINDING;

Summary

In this post, we showed how Moovit unloaded data from Amazon Redshift to a data lake. By doing that, they exposed the data to many additional groups within the organization and democratized the data. These benefits of data democratization are substantial because various teams within Moovit can access the data, analyze it with various tools, and come up with new insights.

As an additional benefit, Moovit reduced their Amazon Redshift utilized storage, which allowed them to maintain cluster size and avoid additional spending by keeping all historical data within the data lake and only hot data in the Amazon Redshift cluster. Keeping only hot data on the Amazon Redshift cluster prevents Moovit from deleting data frequently, which saves IT resources, time, and effort.

If you are looking to extend your data warehouse to a data lake and leverage various tools for big data analytics and machine learning (ML) applications, we invite you to try out this walkthrough.

 


About the Authors

Yonatan Dolan is a Business Development Manager at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value.

 

 

 

 

Alon Gendler is a Startup Solutions Architect at Amazon Web Services. He works with AWS customers to help them architect secure, resilient, scalable and high performance applications in the cloud.

 

 

 

 

Vincent Gromakowski is a Specialist Solutions Architect for Amazon Web Services.

 

 

Analyzing Google Analytics data with Amazon AppFlow and Amazon Athena

Post Syndicated from Shimura Makoto original https://aws.amazon.com/blogs/big-data/analyzing-google-analytics-data-with-amazon-appflow-and-amazon-athena/

Software as a service (SaaS) applications are rapidly growing in importance. This data is essential to include when performing analytics to influence business decisions. Amazon AppFlow is a fully managed integration service that helps you transfer SaaS data to your data lake securely. You can run data transfer flow on demand, on a schedule, or after an event. You can quickly analyze this data using Amazon Athena and join it with numerous datasets already stored on Amazon Simple Storage Service (Amazon S3). You can join multiple SaaS datasets and combine it with operational data sitting in traditional databases such as Amazon Relational Database Service (Amazon RDS) via the Athena federated query feature.

This post walks you through extracting Google Analytics data using Amazon AppFlow and storing it in Amazon S3 so you can query it with Athena.

Architecture overview

The following diagram shows the flow described in this post. You first create a new flow inside Amazon AppFlow to transfer Google Analytics data to Amazon S3. The format of transferred data is multi-line JSON, which Athena doesn’t support. An AWS Lambda function transforms this JSON format file into Apache Parquet format. This transformation enables you to run a query efficiently and cost-effectively. This function can also include other transformations, such as Amazon S3 prefix changes and storing the data using Hive style partitions. Amazon AppFlow supports scheduled jobs to extract only new data, so you can develop an automated workflow with using an Amazon S3 event trigger and a transformation Lambda function. Amazon AppFlow is currently available in 15 Regions; pick the Region where your S3 bucket is located. In this walkthrough, you use US East (N. Virginia).

In this post, you use a sample Google account, OAuth client with appropriate permission, and Google Analytics data. You can also use your own Google resources. To enable Google Analytics access from Amazon AppFlow, you should set up a new OAuth client in advance. Complete the following steps:

  1. On the Google API Console (https://console.developers.google.com), choose Library.
  2. Enter analytics in the search field.
  3. Choose Google Analytics API.
  4. Choose ENABLE and return to the previous page.
  5. Choose Google Analytics Reporting API listed in the search results.
  6. Choose ENABLE and return to the main page.
  7. Choose OAuth consent screen.
  8. Create a new Internal app (if you’re using your personal account, choose External).
  9. Add com as Authorized domains.
  10. Choose Add scope.
  11. Add ../auth/analytics.readonly as Scopes for Google APIs.
  12. Choose Save.
  13. Choose Credentials.
  14. Add OAuth client ID credentials.
  15. Choose Web application.
  16. Enter https://console.aws.amazon.com/ as an authorized JavaScript origins URL.
  17. Enter https://AWSREGION.console.aws.amazon.com/appflow/oauth as an authorized redirect URL. (Replace AWSREGION with the Region you’re working in. If you’re using Amazon AppFlow in us-east-1, enter https://console.aws.amazon.com/appflow/oauth.)
  18. Choose Save.

Setting up Lambda and Amazon S3

You need to start by creating a new S3 bucket as your Amazon AppFlow transfer destination. Then you develop a new Lambda function to transform JSON format data into Parquet format using pandas and pyarrow modules. Finally, you set an Amazon S3 event trigger to automatically call the Lambda function when a new Amazon S3 object is created.

Creating a new S3 bucket

To create an Amazon S3 bucket, complete the following steps:

  1. On the Amazon S3 console, choose Create bucket.
  2. Enter a name for your bucket; for example, appflow-ga-sample.
  3. Choose Create bucket.

Preparing a .zip file for your Lambda layer

To create a .zip file that includes pandas and pyarrow module, complete the following steps:

  1. Set up any environment that can run Docker.
  2. Run the following command:
mkdir python
docker run -it --rm -v $(pwd)/python:/python python:3.6.8 pip install -t /python pandas==0.23.4 pyarrow==0.11.1
zip -r pandas-pyarrow.zip python
  1. On the Amazon S3 console, choose appflow-ga-sample.
  2. Choose Create folder.
  3. Enter a name for your folder; for example, lambda-layer.
  4. Choose Save.
  5. Choose lambda-layer.
  6. Choose Upload.
  7. Choose pandas-pyarrow.zip and choose Upload.

Creating a Lambda layer for Parquet export

To create a Lambda layer, complete the following steps:

  1. On the Lambda console, choose Layers.
  2. Choose Create layer.
  3. For name, enter a name for your layer; for example, pandas-parquet.
  4. Select Upload a file from Amazon S3.
  5. For Amazon S3 link URL, enter an Amazon S3 path for your zip file; for example, s3://appflow-sample/lambda-layer/pandas-parquet.zip.
  6. For Compatible runtimes, choose Python 3.6.
  7. Choose Create.

Creating a Lambda function for data transformation

To create a Lambda function and trigger an Amazon S3 event, complete the following steps:

  1. On the Lambda console, choose Create function.
  2. Select Author from scratch.
  3. For Function name, enter a name for your function; for example, ga-converter.
  4. For Runtime, choose Python 3.6.
  5. Select Create a new role with basic Lambda permissions.
  6. Choose Create function.

  1. At the Lambda function configuration, enter the following code in the lambda_function area in the Function code

This Lambda function downloads AppFlow output file, extracts the necessary data from the Google Analytics JSON file, and transforms it into Parquet format. Finally, it uploads it to Amazon S3 again with a different key name. You can modify the script, especially in dimensions and values names, or any other transformations according to your needs.

import pandas as pd
import boto3
from datetime import datetime as dt

def lambda_handler(event, context):
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    object_key = event['Records'][0]['s3']['object']['key']

    s3_client = boto3.client('s3')
    raw_object = s3_client.get_object(Bucket=bucket_name, Key=object_key)
    raw_data = json.loads(raw_object['Body'].read().decode('utf-8'))
    
    record_dates = [dt.strptime(r['dimensions'][0], '%Y%m%d%H') for r in raw_data['reports'][0]['data']['rows']]
    devices = [r['dimensions'][1] for r in raw_data['reports'][0]['data']['rows']]
    user_counts = [int(r['metrics'][0]['values'][0]) for r in raw_data['reports'][0]['data']['rows']]
    df = pd.DataFrame({
        'year': [r.year for r in record_dates],
        'month': [r.month for r in record_dates],
        'day': [r.day for r in record_dates],
        'hour': [r.hour for r in record_dates],
        'device': devices,
        'user_count': user_counts
    })
    
    output_file = dt.now().strftime('%Y%m%d%H%M%S')
    output_path = '/tmp/{}.parquet'.format(output_file)
    df.to_parquet(output_path)

    s3_resource = boto3.resource('s3')
    bucket = s3_resource.Bucket(bucket_name)
    bucket.upload_file(output_path, 'ga-data/{}.parquet'.format(output_file))

Adding layers

To add layers to your Lambda function, complete the following steps:

  1. Choose Layers in the Designer
  2. Choose Add a layer.
  3. Select Select from list of runtime compatible layers.
  4. For Name, choose pandas-pyarrow.
  5. For Version, choose 1.
  6. Choose Add.

Increasing your timeout setting

To increase the Lambda timeout setting, complete the following steps:

  1. On the Configuration tab, under Basic settings, choose Edit.
  2. Set 30 sec as Timeout.
  3. Choose Save.
  4. On the Configuration tab, choose Save.

Adding an event trigger

To add an Amazon S3 event trigger, complete the following steps:

  1. In the Designer section, choose Add trigger.
  2. Choose S3, and choose the bucket you created.
  3. For Event type, choose All object create events.
  4. For Prefix, enter raw.
  5. Choose Add.

Adding permissions

To add appropriate permissions for this Lambda function to read and write Amazon S3 objects, complete the following steps:

  1. On the Permissions tab, enter a role name; for example, ga-converter-role-zm6u0f4g.
  2. On the AWS Identity and Access Management (IAM) console, choose Policies.
  3. Choose Create Policy.
  4. On the JSON tab, enter the following policy (replace the target bucket name arn:aws:s3:::appflow-ga-sample* with your own bucket name).
{	
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject"
             ],
            "Resource": [
                "arn:aws:s3:::appflow-ga-sample*"
            ]
        }
    ]
}
  1. Choose Review policy.
  2. Enter a name for your new policy; for example, lambda-s3-ga-converter-policy.
  3. Choose Create policy.
  4. On the IAM console, choose
  5. Enter your role name (ga-converter-role-zm6u0f4g) in the search field.
  6. Choose your role.
  7. Choose Attach policies.
  8. Choose lambda-s3-ga-converter-policy.
  9. Choose Attach policy.

Setting up Amazon AppFlow

Now you can create a new Amazon AppFlow flow to transfer from Google Analytics to Amazon S3. To create a new Amazon AppFlow transfer flow, complete the following steps:

  1. On the Amazon AppFlow console, choose Create flow.
  2. Enter a name for your flow; for example, my-ga-flow.
  3. Choose Next.
  4. For Source name, choose Google Analytics.
  5. Choose Create new connection.
  6. Enter your OAuth client ID and client secret, then name your connection; for example, ga-connection.
  7. In the pop-up window, choose to allow amazon.com access to the Google Analytics API.
  8. For Choose Google Analytics object, choose Reports.
  9. For Choose Google Analytics view, choose All Web Site Data.
  10. For Destination name, choose Amazon S3.
  11. For Bucket details, choose the bucket you created.
  12. Enter raw as a prefix.
  13. Select Run on demand.

  1. Choose Next.
  2. Select Manually map fields.
  3. Select the following three fields for Source field name:
    • Time: DIMENSION: ga:dateHour
    • Platform or Device: DIMENSION: ga:deviceCategory
    • User: METRIC: ga:users
  4. Choose Map fields directly.

  1. Choose Next.
  2. In the Add filters section, choose Next.
  3. Choose Create flow.

Running the flow

After creating your new flow, you can run it on demand:

  1. On the Amazon AppFlow console, choose my-ga-flow.
  2. Choose Run flow.

For this walkthrough, you choose on-demand job execution for ease of understanding. In practice, you can choose a scheduled job and periodically extract only newly added data. The Amazon S3 event trigger also helps you transform data automatically.

Querying via Athena

You need to create an external table before querying. Complete the following steps:

  1. On the Athena console, enter create database appflow_data into the query editor.
  2. Choose Run query.
  3. Enter the following command in the query editor (replace the target bucket name appflow-ga-sample with your own bucket):
CREATE EXTERNAL TABLE appflow_data.ga_sample (
  `year` int,
  `month` int,
  `day` int,
  `hour` int,
  `device` string,
  `user_count` int
 )
STORED AS PARQUET
LOCATION 's3://appflow-ga-sample/ga-data'
tblproperties ("parquet.compression"="SNAPPY")
;
  1. Choose Run query.

Now you can query Google Analytics data. Enter the following query and run it. This query shows what kind of device is popular for accessing your website on an hourly basis:

SELECT
  year
  , month
  , day
  , device
  , count(user_count) as cnt
FROM
  appflow_data.ga_sample
GROUP BY
  year
  , month
  , day
  , device
ORDER BY
  cnt DESC
LIMIT 10
; 

The following screenshot shows the query results.

Summary

This post demonstrated how you can transfer Google Analytics data to Amazon S3 using Amazon AppFlow and analyze it with Amazon Athena. You no longer need to build your own application to extract data from Google Analytics and other SaaS applications. Amazon AppFlow enables you to develop a fully automated data transfer and transformation workflow and an integrated query environment in one place.


About the Author

Makoto Shimura is a specialist solutions architect, analytics at Amazon Web Services. He helps customers develop efficient data pipelines on the AWS platform. Previously, he worked as a data engineer, developing a distributed data platform. Outside of work, he loves to spend time with his family, play with his dog, and also play video games.

 

Setting up trust between ADFS and AWS and using Active Directory credentials to connect to Amazon Athena with ODBC driver

Post Syndicated from Arun Alapati original https://aws.amazon.com/blogs/big-data/setting-up-trust-between-adfs-and-aws-and-using-active-directory-credentials-to-connect-to-amazon-athena-with-odbc-driver/

Amazon Athena is a serverless and interactive query service that allows you to easily analyze your raw and processed datasets in Amazon Simple Storage Service (Amazon S3) using standard SQL. The JDBC and ODBC drivers that Athena provides allow you to easily integrate your data analytics tools (such as Microsoft Power BI, Tableau, or SQLWorkBench) with Athena seamlessly and gain insights about your data in minutes.

Before November 2018, you had to connect to Athena with ODBC or JDBC drivers using your IAM user or role credentials. However, with the November 20, 2018 release of support for Microsoft Active Directory Federation Services (ADFS 3.0) and Security Assertion Markup Language (SAML 2.0) in the Athena ODBC/JDBC driver, you can now connect to Athena directly using your Microsoft Active Directory (AD) credentials.

Microsoft ADFS 3.0, a component of Windows Server, supports SAML 2.0 and is integrated with AWS Identity and Authentication Management (IAM). This integration allows Active Directory (AD) users to federate to AWS using corporate directory credentials, such as username and password from Microsoft Active Directory.

This post walks you through configuring ADFS 3.0 on a Windows Server 2012 R2 Amazon Elastic Compute Cloud (Amazon EC2) instance and setting up trust between ADFS 3.0 IdP and AWS through SAML 2.0. The post then demonstrates how to install the Athena OBDC driver on Amazon Linux EC2 instance (RHEL instance) and configure it to use ADFS for authentication.

Solution overview

The following architecture diagram shows how an AD user in your organization is authenticated in the Athena ODBC/JDBC driver:

The process includes the following steps:

  1. A user in your organization uses a client application with the JDBC or ODBC driver to request authentication from your organization’s IdP. The IdP is ADFS 3.0.
  2. The IdP authenticates the user against AD, which is your organization’s Identity Store.
  3. The IdP constructs a SAML assertion with information about the user and sends the assertion to the client application via the JDBC or ODBC driver.
  4. The JDBC or ODBC driver calls the AWS Security Token Service AssumeRoleWithSAML API operation, passing it the following parameters:
    • The ARN of the SAML provider
    • The ARN of the role to assume
    • The SAML assertion from the IdP
  5. The API response to the client application via the JDBC or ODBC driver includes temporary security credentials.
  6. The client application uses the temporary security credentials to call Athena API operations, which allows your users to access Athena API operations.

This post walks you through configuring ADFS-AWS trust through SAML and using that trust to federate AD users in the Athena JDBC/ODBC driver.

To implement this solution, you complete the following steps:

  • Configure ADFS (3.0) on a Windows Server 2012 R2 Amazon EC2 instance
  • Set up trust between AWS and ADFS (3.0) through SAML 2.0 rules
  • Install the Athena ODBC driver 1.0.5 on RHEL EC2 instance and configure it to use ADFS

Prerequisites

For this walkthrough, you need to have the following prerequisites:

  • An understanding of the concepts of Active Directory. The steps for configuring Active Directory on a Windows instance are outside the scope of this post.
  • An understanding of IAM roles and concepts.
  • DNS and networking set up between your Active Directory server and the instance on which the Athena ODBC or JDBC driver is installed. This post sets up Active Directory (that runs ADFS) on a Windows Amazon EC2 instance and ODBC driver on another EC2 instance that are part of the same AWS VPC and subnet. However, for your use case, you need to provide that connectivity between the ADFS server and the OBDC/JDBC instance.

Configuring ADFS (3.0) on a Windows Server 2012 R2 Amazon EC2 instance

In the following steps, you install ADFS 3.0 on a Windows Server 2012 R2 Amazon EC2 instance. As per the prerequisites, you already installed Active Directory on a Windows Server 2012 R2 EC2 instance. For this post, the domain name is arunad.local. For instructions on setting up an Active Directory domain controller on an EC2 instance, see Building Your First Domain Controller on 2012 R2 on the Microsoft TechNet website.

Installing prerequisites for ADFS 3.0

To configure ADFS 3.0 on a Windows domain controller, you must have the following:

  • An SSL certificate – For this post, you can create a self-signed certificate by installing IIS (Internet Information Server)
  • Configuring a service account – Create an Active Directory user with Domain Admin groups
  1. Install IIS on Windows Server 2012 R2 Amazon EC2 instance. For instructions, see How to install and configure IIS on Windows Server 2012 R2 on The Solving website. For this post, you can skip Step 2 in the preceding instructions.
  2. After you install IIS, create a self-signed certificate. For instructions, see How to Create a Self Signed SSL Certificate with Windows Server on the Sophos Community website. For this post, you can skip the step about binding the self-signed certificate in the preceding instructions.

To configure a service account in your domain controller, you create a user in your active directory with the name ADFSSVC and add the user to the domain admins group.

  1. Open Server Manager.
  2. Choose Tools.
  3. Choose Active Directory Users and Computers.
  4. Expand your domain (arunad.local).
  5. Choose User (right-click).
  6. Choose New.
  7. Choose User.
  1. Create a user with the name ADFSSVC.
  2. Set the password to never expire.

You can now add user ADFSSVC to the domain admins group.

  1. Choose Users.
  2. Choose ADFSSV (right-click) and choose Add to group.
  3. In the search bar, enter domain.
  4. Choose checknames.
  5. Choose Domain Admins.
  6. Choose OK.

You receive a message that the user is added to the group, but should still verify it.

  1. Choose ADFSSVC (right-click) and choose Properties.
  2. On the Member Of tab, check that Domain Admins is listed.

Installing and configuring ADFS 3.0

Now that you have installed the prerequisites for ADFS 3.0, you can install and configure ADFS 3.0 on Windows Server 2012 R2 EC2 instance.

  1. Open Server Manager.
  2. Choose Roles and Features.
  3. Select Role-based or feature-based installation.

  1. Choose Next until you reach the Select server roles
  2. For Roles, select Active Directory Federation Services.

  1. Choose Next until you reach the Confirmation installation selections
  2. Choose Install.

  1. Choose Configure the Federation Service for this server.
  2. Select Create the first federation server in a federation server farm.
  3. Choose Next.

  1. Choose Next until you reach the Specify Service Properties
  2. For SSL Certificate, choose the self-signed certificate you installed earlier.
  3. For Federation Service Display Name, enter ArunADFS.
  4. Choose Next.

  1. In Specify Service Account page, select Use an existing domain user account or group Managed Service Account.
  2. Choose Select.

  1. In the text box, enter ADFSSVC.
  2. Choose Check names.
  3. When the name is populated, choose OK.

  1. Enter your password and choose Next.

  1. Select Create a database on this server using Windows Internal Database.
  2. Choose Next.

  1. Choose Next until you reach the Pre-requisite Checks
  2. Choose Configure.

When the server is successfully configured, you may see the following warning message: 

An error occurred during an attempt to set the SPN for the specified service account. Set the SPN for the service account manually. For more information about setting the SPN of the service account manually, see the AD FS Deployment Guide. Error message: The SPN required for this Federation Service is already set on another Active Directory account. Choose a different Federation Service name and try again.

To fix the problem, run the following command by opening PowerShell as an administrator:

setspn -a host/localhost adfssvc

The following code shows the output.

The ADFS 3.0 configuration is now complete.

  1. To download your ADFS server’s federation XML file, open a browser on your Windows Server and enter the following address: https://<yourservername>/FederationMetadata/2007-06/FederationMetadata.xml.

This file is required to set up trust between ADFS and AWS.

Alternatively, you can download the ADFS server’s federation XML file by running the following command as administrator in PowerShell 3.0+:

wget https://<your-server-name>/FederationMetadata/2007-06/FederationMetadata.xml -OutFile FederationMetadata.xml

You can see your server name by clicking Start button (Windows icon) > Right click My Computer > Click Properties > Check for Full Computer Name.

Copy the downloaded XML file on to your local machine so you can use it when creating a SAML-based role in IAM in the next step.

Establishing trust between Windows AD (using ADFS IDP) and AWS via SAML 2.0

Now that you have configured the ADFS 3.0 on a Windows Server, you can establish the trust between AWS and the IdP (ADFS) via SAML assertion claim rules. By establishing this trust, users in your AD can federate into AWS using an IAM role and access AWS resources such as Athena or the AWS Glue Data Catalog.

Setting up this trust requires configuration in both AWS and Active Directory. For AWS, you set up IAM roles and establish a SAML provider. For Active Directory, you write the SAML assertion and claim rules.

Setting up your SAML provider in IAM

To set up your SAML provider, complete the following steps:

  1. On the IAM console, choose Identity provider.
  2. Choose Create provider.
  3. For Provider Type, choose SAML.
  4. For Provider Name, enter MytestADFS.
  5. For Metadata Document, choose the XML file you downloaded earlier.

  1. Create a new role in IAM and choose the trusted entity as SAML 2.0 federation.
  2. For SAML provider, choose the provider you created earlier (MytestADFS).
  3. For Attribute, select SAML:aud.

  1. Add the necessary IAM permissions to this role and create the IAM role.

For this post, attach the AthenaFullAccess managed policy and name it ArunADFSTest.

The role name you give in this step is crucial because any users and groups you create in Active Directory as part of establishing trust in the following steps are based on this name.

Configuring the SAML assertion rules in ADFS 3.0 IdP

In this step, you configure the SAML assertion rules in your ADFS so that users can federate into AWS using the IAM role you created.

Determining how you create and delineate your AD groups and IAM roles in AWS is crucial in how you secure access to your account and manage resources. One approach for creating the AD groups that uniquely identify the IAM role mapping is by selecting a common group naming convention.

For this post, create a group in Active Directory with the naming convention AWS-<AccountID>-<IAMRolename>; for example, AWS-123456789012-ArunADFSTest.

This naming convention is extremely crucial in the next steps because you write SAML assertion claim rules where you fetch all the AD groups of your AD user that contain the string AWS-<AccountID>- in them and extract the last portion of the group name to map it with IAM role in AWS. For example, if the AD user that you’re authenticating to AWS is part of AD group AWS-123456789012-ArunADFSTest, then the claim rules, which you write later, find all groups of the AD user that match with string AWS-123456789012-, extract the last portion of the group name (ArunADFSTest), and send it as the role name to the AWS SAML endpoint in the format aws:arn::iam::<AccountID>: role/ArunADFSTest.

  1. In Server Manager, under Tools, choose Active directory users and computers.
  2. Choose your domain (right-click) and choose New.
  3. Choose Group.

The following screenshot shows creating an AD group with name AWS-123456789012-ArunADFSTest:

  1. After you create the group in AD with name AWS-123456789012-ArunADFSTest, create a new user in that group. For this post, name the user myldapuser1.

  1. Make sure the E-mail field of the user is filled with a valid email address syntax because you use this email field of the user and pass it as RolesessionName to AWS when constructing the SAML token.

  1. After you create the user, add the user to the AD group AWS-123456789012-ArunADFSTest.

Now that you’ve created the AD groups, AD users, and IAM roles, you create the relying party trust in ADFS and write the claim rules. The ADFS IdP needs to construct the following values in the SAML assertion and send the values to AWS for authentication:

    • NameID
    • RoleSessionName
    • Roles (which contains your SAML IDP in AWS and role name)

For instructions on setting up the relying trust and claim rules in ADFS, see AWS Federated Authentication with Active Directory Federation Services (AD FS). For this walkthrough, you can start at the Active Directory Federation Services Configuration section.

  1. For Display name, enter My Amazon Portal.

After the configuration, your claim rules looks similar to the following screenshots.

The following screenshot shows the rules for NameID.

The following screenshot shows the rules for RoleSessionName.

The following screenshot shows the rules for Get AD Groups.

The following screenshot shows the rules for Roles. The SAML provider in IAM was created with the name MyTestADFS (arn arn:aws:iam::123456789012:saml-provider/MytestADFS), so you need to use that same value here (MyTestADFS) in the claim script. If you used a different name, replace it with your IdP ARN name.

After you create these four rules, your ADFS relying trust setup is complete.

Verifying your IdP

To verify that you set up your IdP successfully, complete the following steps:

  1. Navigate to the following URL in the browser on your ADFS server Windows instance (use your Windows Server hostname): https://<windows-hostname>/adfs/ls/IdpInitiatedSignOn.aspx.

  1. Select your ADFS display name (My Amazon Portal).

  1. Enter the AD credentials for the user myldapuser1 you created earlier.

Because the domain name for this demo is arunad.local, specify the user name as ARUNAD\myldapuser1, or you can specify it as [email protected].

If everything is successful, you should be able to sign in to the AWS Management Console.

If you encounter any errors, review the preceding steps. For more information about common errors with SAML, see Troubleshooting SAML 2.0 Federation with AWS. For additional information about troubleshooting, see How to View a SAML Response in Your Browser for Troubleshooting.

The Athena ODBC/JDBC driver when initiating connection to your ADFS server also uses the same federation URL (https://<windows-hostname>/adfs/ls/IdpInitiatedSignOn.aspx) for retrieving the SAML assertion AWS properties.

Installing the Athena ODBC driver 1.0.5 on an Amazon EC2 Linux instance and configuring it to use ADFS

Now that you have configured ADFS 3.0 and set up trust with AWS, the last step is to configure the Athena ODBC driver to use ADFS as its authentication mechanism.

As per the prerequisites, you launched the Active Directory Windows EC2 instance and the Athena ODBC driver Linux (RHEL) EC2 instance in the same VPC and subnet and allowed all traffic between both instances. However, in your environment, you need to make sure your ADFS server can communicate with the machine that has Athena JDBC/ODBC installed.

For this post, the domain name servers in the DHCP option set of VPC (in which you launch the Linux EC2) are modified as follows: 172.31.X.X, AmazonProvidedDNS, where 172.31.X.X is the IP address of the EC2 Windows instance on which ADFS is configured with the domain name arunad.local.

Setting up the environment on your EC2 instance

To set up your environment, complete the following steps:

  1. Launch a Linux EC2 instance with an AMI that supports the Red Hat Enterprise Distribution (for example, AMI with RHEL-7.6_HVM_GA-20181017-x86_64-0-Hourly2-GP2).
  2. SSH into the instance and enter the following commands:
sudo yum install telnet
sudo yum install nc
sudo yum install gcc
sudo yum install wget
sudo yum install vim
  1. Verify connectivity between your ADFS server and RHEL EC2 instance with a networking tool of your choice.

This post uses the ping utility. The following code shows the output:

  1. Install the open-ldap client and test if you can search for a user in AD from this Linux instance (replace the values with your user and domain name):
sudo yum install openldap-clients
ldapsearch -h arunad.local -p 389 -D "CN=mylapuser1,CN=Users,DC=arunad,DC=local" -x -W -b "DC=arunad,DC=local"

If these commands are successful, the RHEL EC2 instance can communicate with the AD server and retrieve the credentials.

Installing and configuring UnixODBC Driver Manager 2.3.4

The Athena ODBC driver on Linux requires you to have one of the following ODBC Driver Managers installed to set up the connection:

  • iODBC 3.52.9, 3.52.10, 3.52.11, or 3.52.12
  • unixODBC 2.3.2,2.3.3, or 2.3.4
  1. Install the UnixODBC driver manager 2.3.4 on your RHEL EC2 instance. For instructions, see unixODBC-2.3.4 on the Beyond Linux From Scratch website.

The command needed for installation on your EC2 instance should look similar to the following code:

wget ftp://ftp.unixodbc.org/pub/unixODBC/unixODBC-2.3.4.tar.gz
tar -zxvf unixODBC-2.3.4.tar.gz
cd unixODBC-2.3.4
./configure --prefix=/usr --sysconfdir=/etc/unixODBC && make
  1. Install the ODBC Driver Manager by switching to the root user. See the following code:
sudo -su root

make install &&

find doc -name "Makefile*" -delete                &&
chmod 644 doc/{lst,ProgrammerManual/Tutorial}/*   &&

install -v -m755 -d /usr/share/doc/unixODBC-2.3.4 &&
cp      -v -R doc/* /usr/share/doc/unixODBC-2.3.4

  1. After the ODBC Driver Manager is installed, make sure the following files are present, which indicate a successful installation of the ODBC driver manager on an RHEL EC2 instance:
    • /etc/unixODBC/odbcinst.ini
    • /etc/unixODBC/odbc.ini
  1. The ODBC Driver Manager library files created as part of the installation are present in the path /usr/lib. Set the shared library path to point your ODBC Driver Manager libraries by entering the following command as a non-root user:

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib

  1. Verify the ODBC Driver Manager environment configuration is loaded properly by entering the code odbcinst -j.

Installing and configuring the Simba Athena ODBC Driver Manager (64 bit)

Now that the ODBC driver manager is configured, the final step is to install the Athena ODBC driver 1.0.5 on this Linux Instance and configure it to use ADFS as the authentication mechanism.

  1. To install the Athena ODBC driver on this EC2 instance, enter the following code:
wget https://s3.amazonaws.com/athena-downloads/drivers/ODBC/SimbaAthenaODBC_1.0.5/Linux/simbaathena-1.0.5.1006-1.x86_64.rpm

sudo yum --nogpgcheck localinstall simbaathena-1.0.5.1006-1.x86_64.rpm

The example ODBC configuration file of the downloaded driver is included in the path /opt/simba/athenaodbc/Setup/odbc.ini.

  1. To configure the ODBC driver to use ADFS as an authentication mechanism, log in as ec2-user and enter the following code:

vim .odbc.ini

  1. Insert the following configuration directives:
[ODBC]
Trace=no

[ODBC Data Sources]
Simba Athena 64-bit=Simba Athena ODBC Driver 64-bit


[Simba Athena 64-bit]
Description=Simba Athena ODBC Driver (64-bit) DSN
Driver=/opt/simba/athenaodbc/lib/64/libathenaodbc_sb64.so


# Connection configurations should be set here.
AwsRegion=us-west-2
Schema=default
S3OutputLocation=s3://aws-athena-query-results-123456780912-us-west-2
AuthenticationType=ADFS
UID=ARUNAD\myldapuser1
PWD=XXXXXXXX
IdP_Host=win-qikm653mpj9.arunad.local
IdP_Port=443
SSL_Insecure=true

Replace the values for IdP_Host and IdP_Port to point to your ADFS server. For this post, these values are win-qikm653mpj9.arunad.local and 443. Similarly, replace UID and PWD with the LDAP user name and password you created earlier. This post uses ARUNAD\myldapuser1.  Also,  replace AwsRegion and S3OutputLocation values according to your environment. For production workloads, make sure that you set SSL_Insecure to false so the driver can verify the server certificate.

You can retrieve the HTTPS port number of your ADFS server by entering the following code on the Windows AD server instance’s power shell:

Get-AdfsProperties

  1. To enable DEBUG level logging on your Athena ODBC driver, edit the file /opt/simba/athenaodbc/lib/64/simba.athenaodbc.ini and set the following values:
[Driver]
## - Note that this default DriverManagerEncoding of UTF-32 is for iODBC.
ErrorMessagesPath=/opt/simba/athenaodbc/ErrorMessages
LogLevel=5
LogPath=/home/ec2-user/odbclogs/
SwapFilePath=/tmp
  1. Now that the Athena ODBC driver is configured, you can test it by entering the following code:

isql -v "Simba Athena 64-bit"

In the preceding code, Simba Athena 64-bit refers to the name of your DSN you specified in /home/ec2-user/.odbc.ini while connecting to the ODBC driver.

If you’re connected, it means you have successfully connected the Athena ODBC driver manager by authenticating your user against ADFS.

You can also check the connection log to verify the connection URI used by your driver and values returned by ADFS to the Athena ODBC driver.

Entries from connection log snippet looks as follows:

May 09 01:40:58.761 DEBUG 50743104 IAMAdfsCredentialsProvider::FormBasedAuthentication: verifySSL: false
May 09 01:40:58.761 DEBUG 50743104 IAMAdfsCredentialsProvider::FormBasedAuthentication: Using URI: https://win-qikm653mpj9.arunad.local:443/adfs/ls/IdpInitiatedSignOn.aspx?loginToRp=urn:amazon:webservices
May 09 01:40:58.821 DEBUG 50743104 IAMSamlPluginCredentialsProvider::GetAWSCredentialsWithSaml: Using RoleArn: arn:aws:iam::143280751103:role/ArunADFSTest, PrincipalArn: arn:aws:iam::143280751103:saml-provider/MytestADFS

Conclusion

This post demonstrated how to configure ADFS 3.0 on your Active Directory and use it as an IdP to federate into AWS using SAML. This post also showed how you can integrate your Athena ODBC driver to ADFS and use your Active Directory credentials directly to connect to Athena. Integrating your Active Directory with the Athena ODBC driver gives you the flexibility to access Athena from BI tools you’re already familiar with and analyze the data in Amazon S3 using SQL, without needing to create separate IAM users.

If your organization has single sign-on (SSO) into AWS enabled with the OKTA service provider, you can use the latest version of the Athena JDBC driver, version 2.0.9, to use OKTA as the authentication mechanism. For more information, see using OKTA in Athena JDBC

If you have any questions or feedback, please leave a comment.

 


About the Author

Alapati Arun is a Cloud Support Engineer with AWS based out of Dallas. He focuses on supporting customers in using big data technologies. He enjoys travel and watching movies.

 

Measure Effectiveness of Virtual Training in Real Time with AWS AI Services

Post Syndicated from Rajeswari Malladi original https://aws.amazon.com/blogs/architecture/measure-effectiveness-of-virtual-training-in-real-time-with-aws-ai-services/

As per International Data Corporation (IDC), worldwide spending on digital transformation will reach $2.3 trillion in 2023. As organizations adopt digital transformation, training becomes an important aspect of this journey. Whether these are internal trainings to upskill existing workforce or a packaged content for commercial use, these trainings need to be efficient and cost effective. With the advent of education technology, it is a common practice to deliver trainings via digital platforms. This makes it accessible for larger population and is cost effective, but it is important that the trainings are interactive and effective. According to  a recent article published by Forbes, immersive education and data driven insights are among the top five Education Technology (EdTech) innovations. These are the key characteristics of creating an effective training experience.

An earlier blog series explored how to build a virtual trainer on AWS using Amazon Sumerian. This series illustrated how to easily build an immersive and highly engaging virtual training experience without needing additional devices or a complex virtual reality platform management. These trainings are easy to maintain and are cost effective.

In this blog post, we will further extend the architecture to gather real-time feedback about the virtual trainings and create data-driven insights to measure its effectiveness with the help of Amazon artificial intelligence (AI) services.

Architecture and its benefits

Virtual training on AWS and AI Services - Architecture

Virtual training on AWS and AI Services – Architecture

Consider a scenario where you are a vendor in the health care sector. You’ve developed a cutting-edge device, such as patient vital monitoring hardware that goes through frequent software upgrades and it is about to be rolled out across different U.S. hospitals. The nursing staff needs to be well trained before it can begin using the device. Let’s take a look at an architecture to solve this problem. We will first explain the architecture for building the training and then we will show how we can measure its effectiveness.

At the core of the architecture is Amazon Sumerian. Sumerian is a managed service that lets you create and run 3D, Augmented Reality (AR), and Virtual Reality (VR) applications. Within Sumerian, real-life scenes from a hospital environment can be created by importing the assets from the assets library. Scenes consist of host(s) and an AI-driven animated character with built-in animation, speech, and behavior. The hosts act as virtual trainers that interact with the nursing staff. The speech component assigns text to the virtual trainer for playback with Amazon Polly. Polly helps convert training content from Sumerian to life-like speech in real time and ensures the nursing staff receives the latest content related to the equipment on which it’s being trained.

The nursing staff accesses the training via web browsers on iOS or Android mobile devices or laptops, and authenticates using Amazon Cognito. Cognito is a service that lets you easily add user sign-up and authentication to your mobile and web apps. Sumerian then uses the Cognito identity pool to create temporary credentials to access AWS services.

The flow of the interactions within Sumerian is controlled using a visual state machine in the Sumerian editor. Within the editor, the dialogue component assigns an Amazon Lex chatbot to an entity, in this case the virtual trainer or host. Lex is a service for building conversational interfaces with voice and text. It provides you the ability to have interactive conversations with the nursing staff, understand its areas of interest, and deliver appropriate training material. This is an important aspect of the architecture where you can customize the training per users’ needs.

Lex has native interoperability with AWS Lambda, a serverless compute offering where you just write and run your code in Lambda functions. Lambda can be used to validate user inputs or apply any business logic, such as fetching the user selected training material from Amazon DynamoDB (or another database) in real time. This material is then delivered to Lex as a response to user queries.

You can extend the state machine within the Sumerian editor to introduce new interactive flows to collect user feedback. Amazon Lex collects user feedback, which is saved in Amazon Simple Storage Service (S3) and analyzed by Amazon Comprehend. Amazon Comprehend is a natural language processing service that uses AI to find meaning and insights/sentiments in text. Insights from user feedback are stored in S3, which is a highly scalable, durable, and highly available object storage.

You can analyze the insights from user feedback using Amazon Athena, an interactive query service which analyzes data in S3 using standard SQL. You can then easily build visualizations using Amazon QuickSight.

By using this architecture, you not only deliver the virtual training to your nursing staff in an immersive environment created by Amazon Sumerian, but you can also gather the feedback interactively. You can gain insights from this feedback and iterate over it to make the training experience more effective.

Conclusion and next steps

In this blog post we reviewed the architecture to build interactive trainings and measure their effectiveness. The serverless nature of this architecture makes it cost effective, agile, and easy to manage, and you can apply it to a number of use cases. For example, an educational institution can develop training content designed for multiple learning levels and the training level can be adjusted in real time based on live interactions with the students. In the manufacturing scenario, you can build a digital twin of your process and train your resources to handle different scenarios with full interactions. You can integrate AWS services just like Lego blocks, and you can further expand this architecture to integrate with Amazon Kendra to build interactive FAQ or integrate with Amazon Comprehend Medical to build trainings for the healthcare industry. Happy building!

Build an AWS Well-Architected environment with the Analytics Lens

Post Syndicated from Nikki Rouda original https://aws.amazon.com/blogs/big-data/build-an-aws-well-architected-environment-with-the-analytics-lens/

Building a modern data platform on AWS enables you to collect data of all types, store it in a central, secure repository, and analyze it with purpose-built tools. Yet you may be unsure of how to get started and the impact of certain design decisions. To address the need to provide advice tailored to specific technology and application domains, AWS added the concept of well-architected lenses 2017. AWS now is happy to announce the Analytics Lens for the AWS Well-Architected Framework. This post provides an introduction of its purpose, topics covered, common scenarios, and services included.

The new Analytics Lens offers comprehensive guidance to make sure that your analytics applications are designed in accordance with AWS best practices. The goal is to give you a consistent way to design and evaluate cloud architectures, based on the following five pillars:

  • Operational excellence
  • Security
  • Reliability
  • Performance efficiency
  • Cost optimization

The tool can help you assess the analytics workloads you have deployed in AWS by identifying potential risks and offering suggestions for improvements.

Using the Analytics Lens to address common requirements

The Analytics Lens models both the data architecture at the core of the analytics applications and the application behavior itself. These models are organized into the following six areas, which encompass the vast majority of analytics workloads deployed on AWS:

  1. Data ingestion
  2. Security and governance
  3. Catalog and search
  4. Central storage
  5. Processing and analytics
  6. User access

The following diagram illustrates these areas and their related AWS services.

There are a number of common scenarios where the Analytics Lens applies, such as the following:

  • Building a data lake as the foundation for your data and analytics initiatives
  • Efficient batch data processing at scale
  • Building a platform for streaming ingest and real-time event processing
  • Handling big data processing and streaming
  • Data-preparation operations

Whichever of these scenarios fits your needs, building to the principles of the Analytics Lens in the AWS Well-Architected Framework can help you implement best practices for success.

The Analytics Lens explains when and how to use the core services in the AWS analytics portfolio. These include Amazon Kinesis, Amazon Redshift, Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. It also explains how Amazon Simple Storage Service (Amazon S3) can serve as the storage for your data lake and how to integrate with relevant AWS security services. With reference architectures, best practices advice, and answers to common questions, the Analytics Lens can help you make the right design decisions.

Conclusion

Applying the lens to your existing architectures can validate the stability and efficiency of your design (or provide recommendations to address the gaps that are identified). AWS is committed to the Analytics Lens as a living tool; as the analytics landscape evolves and new AWS services come on line, we’ll update the Analytics Lens appropriately. Our mission will always be to help you design and deploy well-architected applications.

For more information about building your own Well-Architected environment using the Analytics Lens, see the Analytics Lens whitepaper.

Special thanks to the following individuals who contributed to building this resource, among many others who helped with review and implementation: Radhika Ravirala, Laith Al-Saadoon, Wallace Printz, Ujjwal Ratan, and Neil Mukerje.

Are there questions you’d like to see answered in the tool? Share your thoughts and questions in the comments.

 


About the Authors

Nikki Rouda is the principal product marketing manager for data lakes and big data at Amazon Web Services. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 


Radhika Ravirala is a specialist solutions architect at Amazon Web Services, where she helps customers craft distributed analytics applications on the AWS platform. Prior to her cloud journey, she worked as a software engineer and designer for technology companies in Silicon Valley.

New – Announcing Amazon AppFlow

Post Syndicated from Martin Beeby original https://aws.amazon.com/blogs/aws/new-announcing-amazon-appflow/

Software as a service (SaaS) applications are becoming increasingly important to our customers, and adoption is growing rapidly. While there are many benefits to this way of consuming software, one challenge is that data is now living in lots of different places. To get meaningful insights from this data, we need to have a way to analyze it, and that can be hard when our data is spread out across multiple data islands.

Developers spend huge amounts of time writing custom integrations so they can pass data between SaaS applications and AWS services so that it can be analysed; these can be expensive and can often take months to complete. If data requirements change, then costly and complicated modifications have to be made to the integrations. Companies that don’t have the luxury of engineering resources might find themselves manually importing and exporting data from applications, which is time-consuming, risks data leakage, and has the potential to introduce human error.

Today it is my pleasure to announce a new service called Amazon AppFlow that will solve this issue. Amazon AppFlow allows you to automate the data flows between AWS services and SaaS applications such as Salesforce, Zendesk, and ServiceNow. SaaS application administrators, business analysts, and BI specialists can quickly implement most of the integrations they need without waiting months for IT to finish integration projects.

As well as allowing data to flow in from SaaS applications to AWS services, it’s also capable of sending data from AWS services to SaaS applications. Security is our top priority at AWS, and so all of the data is encrypted while in motion. Some of the SaaS applications have integrated with AWS PrivateLink; this adds an extra layer of security and privacy. When the data flows between the SaaS application and AWS with AWS PrivateLink, the traffic stays on the Amazon network rather than using the public internet. If the SaaS application supports it, Amazon AppFlow automatically takes care of this connection, making private data transfer easy for everyone and minimizing the threat from internet-based attacks and the risk of sensitive data leakage.

You can schedule the data transfer to happen on a schedule, in response to a business event, or on demand, giving you speed and flexibility with your data sharing.

To show you the power of this new service, I thought it would be interesting to show you how to set up a simple flow.

I run a Slack workspace in the United Kingdom and Ireland for web community organizers. Since Slack is one of the supported SaaS applications in Amazon AppFlow, I thought it would be nice to try and import some of the conversation data into S3. Once it was in S3 I could then start to analyze it using Amazon Athena and then ultimately create a visualization using Amazon QuickSight.

To get started, I go to the Amazon AppFlow console and click the Create Flow button.

In the next step, I enter the Flow name and a Flow description. There are also some options for data encryption. By default, all data is encrypted in transit, and in the case of S3 it’s also encrypted at rest. You have the option to supply your own encryption key, but for this demo, I’m just going to use the key in my account that is used by default.

On this step you are also given the option to enter Tags for the resource, I have been getting into the habit of tagging demo infrastructure in my account as Demo which makes it easier for me to know which resources I can delete.

On the next step, I select the source of my data. I pick Slack and go through the wizard to establish a connection with my Slack workspace. I also get to choose what data I want to import from my Slack Workspace. I select the Conversations object in the general slack channel. This will import any messages that are posted to the general channel and then send it to the destination that I configure next.

There are a few destinations that I can pick, but to keep things simple, I ask for the data to be sent to an S3 bucket. I also set the frequency that I want to fetch the data on this step. I want the data to be retrieved every hour, so I select Run the flow on schedule and make the necessary configurations. Slack can be triggered on demand or on schedule; some other sources can be triggered by specific events, such as converting a lead in Salesforce.

The next step is to map the data fields, I am just going to go with the defaults, but you could customize this and combine fields or take only the specific fields required for analysis.

Now the flow has been created, and I have activated it; it runs automatically every hour, adding new data to my S3 bucket.

I won’t go it the specifics of Amazon Athena or Amazon QuickSight, but I used both of these AWS services to take my data stored in S3 and produce a word cloud of the most common words that are used in my Slack Channel.

The cool thing about Athena is that you can run SQL queries directly over the encrypted data in S3 without needing any additional data warehouse. You can see the results in the image below. I could now easily share this as a dashboard with anyone in my organization.

Amazon AppFlow is launching today with support for S3 and 13 SaaS applications as sources of data, and S3, Amazon Redshift, Salesforce, and Snowflake as destinations, and you will see us add hundreds more as the service develops.

The service automatically scales up or down to meet the demands you place on it, it also allows you to transfer 100GB in a single flow which means you don’t need to break data down into batches. You can trust Amazon AppFlow with your most valuable data as we have architected to be highly available and resilient.

Amazon AppFlow is available from today in US East (Northern Virginia), US East (Ohio), US West (Northern California), US West (Oregon), Canada (Central), Asia Pacific (Singapore), Asia Pacific (Toyko), Asia Pacific (Sydney), Asia Pacific (Seoul), Asia Pacific (Mumbai), Europe (Paris), Europe (Ireland), Europe (Frankfurt), Europe (London), and South America (São Paulo) with more regions to come.

Happy Data Flowing

— Martin

Serving Billions of Ads in Just 100 ms Using Amazon Elasticache for Redis

Post Syndicated from Rodrigo Asensio original https://aws.amazon.com/blogs/architecture/serving-billions-of-ads-with-amazon-elasticache-for-redis/

This post was co-written with Lucas Ceballos, CTO of Smadex

Introduction

Showing ads may seem to be a simple task, but it’s not. Showing the right ad to the right user is an incredibly complex challenge that involves multiple disciplines such as artificial intelligence, data science, and software engineering. Doing it one million times per second with a 100-ms constraint is even harder.

In the ad-tech business, speed and infrastructure costs are the keys to success. The less the final user waits for an ad, the higher the probability of that user clicking on the ad. Doing that while keeping infrastructure costs under control is crucial for business profitability.

About Smadex

Smadex is the leading mobile-first programmatic advertising platform specifically built to deliver best user acquisition performance and complete transparency.

Its state-of-the-art digital signal processing (DSP) technology provides advertisers with the tools they need to achieve their goals and ROI, with measurable results from web forms, post-app install events, store visits, and sales.

Smadex advertising architecture

What does showing ads look like under the hood? At Smadex, our technology works based on the OpenRTB (Real-Time Bidding) protocol.

RTB is a means by which advertising inventory is bought and sold on a per-impression basis, via programmatic instantaneous auction, which is similar to financial markets.

To show ads, we participate in auctions deciding in real time which ad to show and how much to bid trying to optimize the cost of every impression.

High level diagram

  1. The final user browses the publisher’s website or app.
  2. Ad-exchange is called to start a new auction.
  3. Smadex receives the bid request and has to decide which ad to show and how much to offer in just 100 ms (and this is happening one million times per second).
  4. If Smadex won the auction, the ad must be sent and rendered on the publisher’s website or app.
  5. In the end, the user interacts with the ad sending new requests and events to Smadex platform.

Flow of data

As you can see in the previous diagram, showing ads is just one part of the challenge. After the ad is shown, the final user interacts with it in multiple ways, such as clicking it, installing an application, subscribing to a service, etc. This happens during a determined period that we call the “attribution window.” All of those interactions must be tracked and linked to the original bid transaction (using the request_id parameter).

Doing this is complicated: billions of bid transactions must be stored and available so that they can be quickly accessed every time the user interacts with the ad. The longer we store the transactions, the longer we can “wait” for an interaction to take place, and the better for our business and our clients, too.

Detailed diagram

Challenge #1: Cost

The challenge is: What kind of database can store billions of records per day, with at least a 30-day retention capacity (attribution window), be accessed by key-value, and all by spending as little as possible?

The answer is…none! Based on our research, all the available options that met the technical requirements were way out of our budget.

So…how to solve it? Here is when creativity and the combination of different AWS services comes into place.

We started to analyze the time dispersion of the events trying to find some clues. The interesting thing we spotted was that 90% of what we call “post-bid events” (impression, click, install, etc.) happened within one hour after the auction took place.

That means that we can process 90% of post-bid events by storing just one hour of bids.

Under our current workload, in one hour we participate in approximately 3.7 billion auctions generating 100 million bid records of an average 600 bytes each. This adds up to 55 gigabytes per hour, an easier amount of data to process.

Instead of thinking about one single database to store all the bid requests, we decided to split bids into two different categories:

  • Hot Bid: A request that took place within the last hour (small amount and frequently accessed)
  • Cold Bid: A request that took place more than our hour ago (huge amount and infrequently accessed)

Amazon ElastiCache for Redis is the best option to store 55 GB of data in memory, which gives us the ability to query in a key-value way with the lowest possible latency.

Hot Bids flow

Hot Bids flow diagram

  1. Every new bid is a hot bid by definition so it’s going to be stored in the hot bids Redis cluster.
  2. At the moment of the user interaction with the ad, the Smadex tracker component receives an HTTPS notification, including the bid request UUID that originated it.
  3. Based on the date of occurrence extracted from the received UUID, the tracker component can determine if it’s looking for a hot bid or not. If it’s a hot bid, the tracker reads it directly from Redis performing a key-value lookup query.

It’s been easy so far but what to do with the other 29 days and 23 hours we need to store?

Challenge #2: Performance

As we previously mentioned, cold bids are a huge infrequently accessed number of records with only 10% of post-bid events pointing to them. That sounds like a good use case for an inexpensive and slower data store like Amazon S3.

Thanks to the S3 low-cost storage prices combined with the ability to query S3 objects directly using Amazon Athena, we were able to optimize our costs by storing and querying cold bids by implementing a serverless architecture.

Cold Bids Flow

Cold Bids flow diagram

  1. Incoming bids are buffered by Fluentd and flushed to S3 every one minute in JSON format. Every single file flushed to S3 contains all the bids processed by a specific EC2 instance for one minute.
  2. An AWS Lambda function is automatically triggered on every new PutObject event from S3. This function transforms the JSON records to Parquet format and will save it back the S3 bucket, but this time into a specific partition folder based on file creation timestamp.
  3. As seen on the hot bids flow, the tracker component will determine if it’s looking for a hot or a cold bid based on the extracted timestamp of the request UUID. In this case, the cold bid will be retrieved by running an Amazon Athena look-up query leveraging the use of partitions and Parquet format to reduce as much as possible the latency and data that needs to be scanned.

Conclusion

Thanks to this combined approach using different technologies and a variety of AWS services we were able to extend our attribution window from 30 to 90 days while reducing the infrastructure costs by 45%.

 

 

Query, visualize, and forecast TruFactor web session intelligence with AWS Data Exchange

Post Syndicated from Jay Park original https://aws.amazon.com/blogs/big-data/query-visualize-and-forecast-trufactor-web-session-intelligence-with-aws-data-exchange/

Given the infinite nature of data, finding the right data set to gain business insights can be a challenge. You can improve your business by having access to a central repository of various data sets to query, visualize, and forecast. With AWS Data Exchange, finding the right data set has become much simpler. As an example, you can use data sets on web session visitation and demographics to help you understand which demographic groups visit your website most frequently. You can then improve your business through machine learning (ML) models and visitation forecasts.

AWS Data Exchange makes it easy to find, subscribe to, and use third-party data in the cloud. After you subscribe to a data product within AWS Data Exchange, you can use the AWS Data Exchange API, AWS CLI, or the AWS Management Console to load data into Amazon S3 directly. You can then analyze the imported data with a wide variety of AWS services, ranging from analytics to machine learning.

This post showcases TruFactor Intelligence-as-a-Service data on AWS Data Exchange. TruFactor’s anonymization platform and proprietary AI ingests, filters, and transforms more than 85 billion high-quality raw signals daily from wireless carriers, OEMs, and mobile apps into a unified phygital consumer graph across physical and digital dimensions. TruFactor intelligence is application-ready for use within any AWS analytics or ML service to power your models and applications running on AWS, with no additional processing required. Common use cases include the following:

  • Consumer segmentation – Web intelligence on internet browsing behavior in the US provides a complete view of the consumer, including interests, opinions, values, digital behavior, and sentiment, to inform segmentation of your customers and those of your competitors.
  • Customer acquisition or churn campaigns – Internet browsing behavior can identify affinity properties for new prospects as well as switching to competitors’ websites.

This walkthrough uses TruFactor’s Daily Mobile Web Session Index and Daily Demographics by Mobile Web Sessions data sets, which are both available for free subscription through the AWS Data Exchange console. While there are commercial data sets available for purchase in AWS Data Exchange, this post uses trial data sets to showcase the breadth and depth of analytics possible with TruFactor’s intelligence.

This TruFactor intelligence is aggregated on over 3 billion records from telco carrier networks and mobile apps per day, originating from approximately 30 million consistent users, distilled into session-level information that provides a complete view of user digital interests. The accuracy, breadth of data provided, and the persistency of the panel deliver a unified view of consumers that can inform insights or power analytic models or applications on AWS.

These two data sets have applications across verticals such as retail, financial services, and advertising. Common use cases include creating detailed customer segmentation (for example, full DNA maps of consumers based on visits to specific web HTTP hosts), identifying affinity properties, and estimating demand for apps or services. This intelligence is also ideal for identifying trends and changes over time.

Solution overview

The following diagram illustrates the architecture of the solution.

The workflow is comprised of the following steps:

  1. Subscribe to a data set from AWS Data Exchange and export to Amazon S3
  2. Run an AWS Glue crawler to load product data
  3. Perform queries with Amazon Athena
  4. Visualize the queries and tables with Amazon QuickSight
  5. Run an ETL job with AWS Glue
  6. Create a time series forecast with Amazon Forecast
  7. Visualize the forecasted data with Amazon QuickSight

This post looks at the demographic distributions across various websites and how to use ML to forecast website visitation.

Walkthrough overview

The walkthrough includes the following steps:

  1. Subscribe to a TruFactor data set from the AWS Data Exchange console and export the data set to Amazon S3
  2. Use an AWS Glue crawler to load the product data into an AWS Glue Data Catalog
  3. Use Amazon Athena for SQL querying
  4. Visualize the query views and tables with Amazon QuickSight
  5. Use AWS Glue jobs to extract, transform, and load your data for forecasting with Amazon Forecast
  6. Use Amazon Forecast to create a time series forecast of the transformed data
  7. Visualize the forecasted web visitation data with Amazon QuickSight

You do not have to perform additional processing or manipulation of the TruFactor intelligence for this walkthrough.

The data sets

The TruFactor data sets this post uses are in Parquet format and snappy compression. The following section provides additional details and schema for each data set.

TruFactor Daily Mobile Web Session Index (US – Nationwide) — Trial

The TruFactor Daily Mobile Web Session Index (US – Nationwide) — Trial data set provides aggregate information per HTTP host as a view of the internet browsing behavior in the US. TruFactor generates the data from high-quality packet layer data sourced from mobile carriers that includes the mobile internet traffic originating from a user’s device. TruFactor derives the projected counts from observed counts that are filtered for exclusion and anonymized to make sure users cannot be re-identified. It extrapolates values from US Census data using a proprietary algorithm. For the avoidance of doubt, this data set does not include user-level data.

The following screenshot shows the schema for the mobile web session data set by HTTP host, session time, MB transferred, number of events, sessions, users, and dates.

TruFactor Daily Demographics by Mobile Web Session (US) — Trial

The TruFactor Daily Demographics by Mobile Web Session (US) — Trial data set includes aggregate demographics: a projected distribution of users per HTTP host as a view of the internet browsing behavior in the US. TruFactor generates the data from high-quality packet layer data sourced from mobile carriers that includes the mobile internet traffic originating from a user’s device. TruFactor derives the distribution from observed counts that are filtered for exclusion and anonymized to make sure users cannot be re-identified. It extrapolates values from US Census data using a proprietary algorithm. Demographics include gender, age range, ethnicity, and income range.

The following screenshot shows the partial schema for the demographics by web session data set. The full schema includes the following attributes: HTTP host, age ranges, genders, ethnicity, income ranges, and date.

Prerequisites

To complete this walkthrough successfully, you must have the following resources:

  • An AWS account.
  • Familiarity with AWS core services and concepts.
  • The ability to launch new resources in your account. Some resources may not be eligible for Free Tier usage and might incur costs.
  • Subscription to TruFactor’s Daily Mobile Web Session Index (US – Nationwide) – Trial and Daily Demographics by Mobile Web Session (US) – Trial data sets. For instructions on subscribing to a data set on AWS Data Exchange, see AWS Data Exchange – Find, Subscribe To, and Use Data Products.

Using AWS Data Exchange, Amazon S3, AWS Glue, Amazon Athena, and Amazon QuickSight

This section examines the key demographics of visitors to the top seven e-commerce websites. This information can help you understand which demographic groups are visiting your website most frequently and also help you target ads and cater to certain demographics groups. You use AWS Glue crawlers to crawl your data sets in Amazon S3, populate your AWS Glue Data Catalog, query the AWS Glue Data Catalog using Amazon Athena, and use Amazon QuickSight to visualize the queries.

Step 1: Exporting the data from AWS Data Exchange to Amazon S3

To export your TruFactor data set subscriptions into an Amazon S3 bucket, complete the following steps:

  1. Create an Amazon S3 bucket in your working account. For the purposes of our demo, we have named our S3 bucket trufactor-data-exchange-bucket.
  2. Create two folders within the S3 bucket: web_sess and demo_by_web_sess.

This post uses a trial data set with a sample of 14 days. A paid subscription to TruFactor’s Web Sessions data on AWS Data Exchange includes 6 months of historical data, which refreshes daily.

The following screenshot shows the two folders within the S3 bucket.You are now ready to export the data sets.

  1. On the AWS Data Exchange console, under Subscriptions, locate TruFactor Daily Mobile Web Sessions Index (US – Nationwide) – Trial.
  2. Under Revisions, choose the most recent Revision ID.
  3. Choose all assets except the manifest.json files.
  4. Choose Export to Amazon S3.
  5. In the window that opens, choose the S3 bucket and folder to export the product data into.
    • Export all the assets into the S3 bucket’s web_sess folder.
  6. Repeat the previous steps for the TruFactor Daily Demographics by Mobile Web Sessions (US) – Trial data set, with the following change:
    • Export the assets into the demo_by_web_sess folder in your S3 bucket.
  7. Check to make sure you successfully imported the TruFactor data sets in the Overview. The following screenshot shows that the data sets are partitioned into folders by date. Each folder contains Parquet files of web session data for each day.

Step 2: Populating your AWS Glue Data Catalog with the TruFactor data sets

Now that you have successfully exported the TruFactor data sets into an Amazon S3 bucket, you create and run an AWS Glue crawler to crawl your Amazon S3 bucket and populate the AWS Glue Data Catalog. Complete the following steps:

  1. On the AWS Glue console, under Data Catalog, choose Crawlers.
  2. Choose Add crawler.
  3. For Crawler name, enter a name; for example, trufactor-data-exchange-crawler.
  4. For Crawler source type, choose Data stores.
  5. Choose Next.
  6. For Choose a data store, choose S3.
  7. For Crawl data in, select Specified path in my account.
  8. For Include path, enter the path for the web_sess data set folder. The crawler points to the following path: s3://<trufactor-data-exchange-bucket>/web_sess.
  9. Choose Next.
  10. Select Yes to Add another data store.
  11. Choose Next.
  12. For Include path, enter the path for the demo_by_web_sess data set folder. The crawler points to the following path: s3://<trufactor-data-exchange-bucket>/demo_by_web_sess.
  13. Choose Next.
  14. In the Choose an IAM role section, select Create an IAM role. This is the role that the AWS Glue crawler and AWS Glue jobs use to access the Amazon S3 bucket and its content.
  15. For IAM role, enter the suffix demo-data-exchange.
  16. Choose Next.
  17. In the schedule section, leave the Frequency with the default Run on Demand.
  18. Choose Next.
  19. In the Output section, choose Add database.
  20. Enter a name for the database; for example, trufactor-db.
  21. Choose Next, then choose Finish.This database contains the tables that the crawler discovers and populates. With these data sets separated into different tables, you join and relationalize the data.
  1. In the Review all steps section, review the crawler settings and choose Finish.
  2. Under Data Catalog, choose Crawlers.
  3. Select the crawler you just created.
  4. Choose Run crawler.The AWS Glue crawler crawls the data sources and populates your AWS Glue Data Catalog. This process can take up to a few minutes. When the crawler is finished, you can see two tables added to your crawler details. See the following screenshot.You can now view your new tables.
  1. Under Databases, choose Tables.
  2. Choose your database.
  3. Choose View the tables. The table names correspond to the Amazon S3 folder directory you used to point your AWS Glue crawler. See the following screenshot.

Step 3: Querying the data using Amazon Athena

After you populate the AWS Glue Data Catalog with TruFactor’s Mobile Web Session and Demographics data, you can use Amazon Athena to run SQL queries and create views for visualization. Complete the following steps:

  1. On the Amazon Athena console, choose Query Editor.
  2. On the Database drop-down menu, choose the database you created.
  3. To preview one of the tables in Amazon Athena, choose Preview table.
    On the Results section, you should see 10 records from the web_sess table. See the following screenshot.In this next step, you run a query that creates a view of the Web Session Index and Demographics data across a group of e-commerce HTTP hosts. This is broken down by the percentage of users categorized by age and gender, number of users, MB transferred, and number of sessions ordered by date.
  4. Run the following SQL query in Amazon Athena:
    CREATE OR REPLACE VIEW e_commerce_web_sess_data AS 
    SELECT
      "date_parse"("a"."partition_0", '%Y%m%d') "date",
      "a"."http_host",
      "a"."users",
      "a"."mb_transferred",
      "a"."number_of_sessions",
      "b"."18_to_25",
      "b"."26_to_35",
      "b"."36_to_45",
      "b"."46_to_55",
      "b"."56_to_65",
      "b"."66_to_75",
      "b"."76_plus",
      "b"."male",
      "b"."female"
    FROM  
      ((
       SELECT
         "partition_0",
         "http_host",
         "users",
         "mb_transferred",
         "number_of_sessions"
       FROM
         "trufactor-db"."web_sess"
       WHERE ("http_host" IN ('www.amazon.com', 'www.walmart.com', 'www.ebay.com', 'www.aliexpress.com', 'www.etsy.com', 'www.rakuten.com', 'www.craigslist.com'))
    )  a
    LEFT JOIN (
       SELECT
         "http_host" "http_host_2",
         "partition_0" "partition_2",
         "age_ranges"."18_to_25",
         "age_ranges"."26_to_35",
         "age_ranges"."36_to_45",
         "age_ranges"."46_to_55",
         "age_ranges"."56_to_65",
         "age_ranges"."66_to_75",
         "age_ranges"."76_plus",
         "genders"."male",
         "genders"."female"
       FROM
         "trufactor-db"."demo_by_web_sess"
       WHERE ("http_host" IN ('www.amazon.com', 'www.walmart.com', 'www.ebay.com', 'www.aliexpress.com', 'www.etsy.com', 'www.rakuten.com', 'www.craigslist.com'))
    )  b ON (("a"."http_host" = "b"."http_host_2") AND ("a"."partition_0" = "b"."partition_2")))
    ORDER BY "date" ASC

  5. After you create the view, you can preview it by repeating the above steps for previewing a table. The following screenshot shows the results, which include the number of users, user percentages by age group and gender, and a list of e-commerce hosts listed by date.

Step 4: Visualizing with Amazon QuickSight

After you query your data sets in Amazon Athena, you can use Amazon QuickSight to visualize your results. You must first grant Amazon QuickSight access to the Amazon S3 bucket that holds the TruFactor data sets, which you can do through the Manage QuickSight setting on the Amazon QuickSight console. After you grant access to the Amazon S3 bucket, you visualize the tables and queries with Amazon QuickSight. Complete the following steps:

  1. In the Amazon QuickSight console, choose New Analysis.
  2. Choose New data set.
  3. Choose Athena as the data source.
  4. For Data source name, enter trufactor-data-exchange-source.
  5. From the drop-down menu, choose the database and view you created.
  6. Choose Directly query your data.
  7. Choose Visualize. Because TruFactor intelligence is application-ready, you can gain immediate insights by using Amazon Athena to query and Amazon QuickSight to visualize. This post includes visualizations of the data set for the first two weeks of October 2019. The following graph visualizes the number of users on different HTTP hosts.The following pie charts further filter the HTTP hosts by age range.The following bar chart offers another visualization of users by age range.You could add other fields such as income range, ethnicity, and gender.

Running AWS Glue Jobs and Amazon Forecast

This section discusses how to use AWS Glue jobs to query and export your data set for forecasting with Amazon Forecast. This walkthrough examines the amount of users’ visitation over 14 days across the top 50 HTTP hosts ranked by users’ visitation. From there, you forecast the users’ visitation for these HTTP hosts for the next three days.

Step 1: Creating and running an AWS Glue job

To create and run your AWS Glue job, complete the following steps:

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Add job.
  3. For Name, enter a name for the AWS Glue job; for example, demo-glue-job.
  4. For Type and Glue version, keep the default values.
  5. For This job runs, select A new script to be authored by you.
  6. In the Security configuration, script libraries, and job parameters (optional) section, set the Maximum capacity cluster size to 2. This reduces the cost of running the AWS Glue job. By default, the cluster size is set to 10 Data Processing Units (DPU).
  7. Choose Next.
  8. In the Connections section, keep the default values.
  9. Choose Save job and edit script.
  10. Enter the following code in the script section, and replace YOUR_BUCKET_NAME on line 42 with the name of your bucket.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.dynamicframe import DynamicFrame
    from awsglue.job import Job
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    db_name = "trufactor-db"
    tbl_name = "web_sess"
    
    web_sess_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = tbl_name, transformation_ctx = "web_sess_dyf")
    web_sess_df = web_sess_dyf.toDF()
    web_sess_df.createOrReplaceTempView("webSessionTable")
    web_sess_sql_df = spark.sql("""
    SELECT to_date(partition_0, 'yyyyMMdd') AS date,
             http_host,
             users
    FROM 
        (SELECT partition_0,
             http_host,
             users,
             row_number()
            OVER ( PARTITION By partition_0
        ORDER BY users DESC ) AS rn
        FROM webSessionTable )
    WHERE rn<=50
    ORDER BY date""")
    
    web_sess_sql_df.coalesce(1).write.format("csv").option("header","false").save("s3://YOUR_BUCKET_NAME/amazon_forecast_demo/dataset/sampleset")
    job.commit()

    This code queries the top 50 HTTP hosts, ranked by users’ visitation during the first half of October and returns the users, date, and HTTP hosts columns. The query results upload to your Amazon S3 bucket in CSV format (you need the files in CSV to use Amazon Forecast).

  11. Choose Save and close the AWS Glue job screen.Before you can run the AWS Glue job, you need to modify the IAM role associated with AWS Glue. Currently, the IAM role only has permission to get and put objects in the directories you specified earlier. You need to update the IAM policy to allow permission to get and put objects in all subdirectories of the Amazon S3 bucket.
  12. On the IAM console, choose the role you used for this walkthrough: AWSGlueServiceRole-demo-data-exchange.
  13. In the Summary section for the IAM role, on the Permissions tab, choose the IAM policy associated with the Managed policy.
  14. Choose Edit policy.
  15. Change the view from Visual editor to JSON.
  16. Within this JSON object, under Resource, add another resource into the list of values. The following code is the updated IAM policy:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject"
                ],
                "Resource": [
                    "arn:aws:s3:::trufactor-data-exchange-bucket/web_sess*",
                    "arn:aws:s3:::trufactor-data-exchange-bucket/demo_by_web_sess*",
                    "arn:aws:s3:::trufactor-data-exchange-bucket/*"
                ]
            }
        ]
    }

  17. Choose Review policy and Save changes.
  18. On the AWS Glue console, under ETL, choose Jobs. Select the job you created earlier.
  19. From the Action drop-down menu, choose Run job. On the History tab, you can see when the status changes to Succeeded. See the following screenshot.This job can take 15–20 minutes to complete.

Step 2: Creating a dataset group, training a predictor, and creating forecasts in Amazon Forecast

To create your dataset group, train a predictor, and create forecasts, complete the following steps:

  1. On the Amazon Forecast console, choose View dataset groups.
  2. Choose Create dataset group.
  3. For Dataset group name, enter a name; for example, users_visitation_sample_dataset_group.
  4. For Forecasting domain, choose Web traffic.
  5. Choose Next.
  6. On the Create target time series dataset page, for Dataset name, enter the name of your dataset; for example, users_visitation_sample_dataset.
  7. For Frequency of your data, choose 1 day.
  8. For Data schema, update the data schema JSON object with the following code:
    {
      "Attributes":[
        {
          "AttributeName": "timestamp",
          "AttributeType": "timestamp"
        },
        {
          "AttributeName": "item_id",
          "AttributeType": "string"
        },
        {
          "AttributeName": "value",
          "AttributeType": "float"
        }
      ]
    }

  9. Choose Next.
  10. On the Import target time series data page, for Dataset import name, enter your dataset name; for example, users_visitation_sample_dataset_import.
  11. For Timestamp format, enter yyyy-MM-dd.
  12. For IAM Role, create a new role and grant Amazon Forecast access to the S3 bucket that you are using for this demo.
  13. For Data Location, use the S3 path that you exported your CSV file to after the AWS Glue job: s3://<trufactor-data-exchange-bucket>/amazon_forecast_demo/dataset/sampleset.
  14. Review the settings for import target time series data and choose Start import.The process of importing the data can take approximately 10 minutes. When the status changes to Active, you can begin training a predictor.
  1. On the Dashboard page, choose Start next to Predictor training.
  2. On the Train predictor page, for Predictor name, enter a name for the predictor; for example, users_visitation_sample_dataset_predictor.
  3. For Forecast horizon, choose 3.
  4. For Forecast frequency, choose day.
  5. For Algorithm selection, select Manual. If you use the other algorithm option, AutoML, you allow Amazon Forecast to choose the right algorithm based on a pre-defined objective function, which is not necessary for this walkthrough.
  6. For Algorithm, choose Deep_AR_Plus (you use deep learning to forecast users’ visitation across 50 HTTP hosts).
  7. Leave all other options with the default values.
  8. Review the settings and choose Train predictor. The predictor training process can take 20–30 minutes. When the training completes, the status changes to Active. To evaluate the predictor’s (ML model) accuracy, Amazon Forecast splits the input time series data into two data sets: training and test. This process tests a predictive model on historical data and is called backtesting. When it splits the input time series data, it maintains the data’s order, which is crucial for time series data. After training the dataset, Amazon Forecast calculates the root mean square error (RSME) and weighted quantile losses to determine how well the predictor performed. For more detailed information about backtesting and predictor metrics, see Evaluating Predictor Accuracy. When the predictor is finished training, you can create a forecast.
  9. On the Dashboard page, under Generate forecasts, choose Start.
  10. For Forecast name, enter a forecast name; for example, users_visitation_sample_forecast.
  11. For Predictor, choose your trained predictor.
  12. For Forecast types, you can enter any values between 0.01 and 0.99 and the mean. These are percentage probabilities of satisfying the original demand. This post enters .50, .90, .99, mean.
  13. Choose Create a forecast.The forecast creation process can take 15–20 minutes.
  14. When the forecast is complete, choose Forecasts.
    You should see a single forecast. See the following screenshot.
    You can now export the generated forecast to a new folder within your existing Amazon S3 bucket for visualization with Amazon QuickSight.
  1. Choose the newly generated forecast.
  2. Under Exports, choose Create forecast export.
  3. For Export name, enter a name for the export; for example, users_visitation_sample_forecast_export.
  4. For Generated forecast, choose users_visitation_sample_forecast.
  5. For IAM Role, choose the role you created earlier.
  6. For S3 forecast export location, enter the S3 path to store the forecasts: s3://<trufactor-data-exchange-bucket>/amazon_forecast_demo/forecasts/sampleset.
  7. Choose Create forecast export.The exporting process can take up to 5 minutes. Alternatively, you can visualize the user visitation forecasts for the 50 HTTP hosts directly through the Amazon Forecast console or Query API.

Step 3: Querying a view using Amazon Athena and downloading the forecast file

Before you visualize users’ visitation forecast data, create a view in Amazon Athena for the top 50 HTTP hosts ranked by users’ visitation over 14 days. Complete the following steps:

  1. Run the following query in Amazon Athena:
    CREATE OR REPLACE VIEW "top_50_users" AS
    SELECT date_format(date_parse(partition_0,
             '%Y%m%d'),'%Y-%m-%d') AS "date", http_host, users
    FROM 
        (SELECT partition_0,
             http_host,
             users,
             row_number()
            OVER (PARTITION By partition_0
        ORDER BY  users DESC ) AS rn
        FROM "trufactor-db"."web_sess")
    WHERE rn<=50
    ORDER BY date

    The code queries the top 50 HTTP hosts ranked by users’ visitation sorted by date.

  2. In the Amazon S3 console, navigate to the S3 bucket and directory holding the files: s3://<trufactor-data-exchange-bucket>/amazon_forecast_demo/forecasts/sampleset. The following screenshot shows three different files inside the folder.
  1. Download the CSV file.

Step 4: Visualizing in Amazon QuickSight

To visualize the data in Amazon QuickSight, complete the following steps:

  1. On the Amazon QuickSight console, choose Manage data.
  2. Choose New data set.
  3. Choose Upload a file.
  4. Upload the CSV file that you downloaded.
  5. On the Confirm file upload settings page, choose Next.
  6. Choose Visualize.
  7. Return to the Amazon QuickSight console and choose Manage data.
  8. Choose New data set for the top 50 HTTP hosts view you queried earlier.
  9. On the Create a Data set page, find the data source you created earlier: trufactor-data-exchange-source.
  10. From the drop-down list, choose the database and view you created.
  11. Choose Directly query your data.
  12. Choose Visualize.
  13. On the new Amazon QuickSight analysis page, choose the pencil icon next to Data set.
  14. Choose Add data set.
  15. Choose the CSV file you uploaded.

You now have a single Amazon QuickSight analysis with multiple data sets to visualize.

The following graphs highlight the historical data for the users’ visitation across 50 HTTP hosts for the first two weeks of October and the mean forecast for users’ visitation for the next three days.

The following graphs highlight the historical data and forecasted P50, P90, and P99 quantile values for www.google.com.

Amazon Forecast makes it easier to get started with machine learning without having to create your own ML models from scratch. You can use this information to anticipate the web traffic for the upcoming week, which can aid in scaling your resources and applications accordingly.

Cleaning up

To avoid incurring future charges, delete the following resources that you created in this walkthrough:

  • The Amazon S3 bucket trufactor-data-exchange-bucket
  • The AWS Glue crawler trufactor-data-exchange-crawler
  • The AWS Glue job demo-glue-job
  • The AWS IAM role AWSGlueServiceRole-demo-data-exchange
  • The AWS Glue database trufactor-db
  • The Amazon QuickSight demo data sets and analysis
  • The following Amazon Forecast resources (in this order) for users_visitation_sample_dataset_group via the console:
    • Existing forecasts under Forecasts
    • Existing predictors under Predictors
    • Existing datasets under Datasets

Conclusion

This walkthrough detailed how to import a data set to Amazon S3 from AWS Data Exchange, use AWS Glue to run crawlers and an ETL job on the data, run SQL queries with Amazon Athena, create a time series forecast of the queried data with Amazon Forecast, and visualize the queried and forecasted data with Amazon QuickSight.

This post used TruFactor Intelligence-as-a-Service, one of the AWS Data Exchange launch partners, to power this walkthrough. TruFactor intelligence on AWS Data Exchange highlighted the ease of loading directly into Amazon S3 and layering advanced AWS services.

For more information about TruFactor and the AWS Data Exchange, see TruFactor on AWS Data Exchange on the TruFactor website. You can subscribe to TruFactor Intelligence directly on AWS Data Exchange or engage with TruFactor directly to identify the right offering from the larger product portfolio of anonymized consumer intelligence.


About the Authors

Jay Park is a solutions architect at AWS.

 

 

 

 

Ariana Rahgozar is a solutions architect at AWS.