Tag Archives: Analytics

Trakstar unlocks new analytical opportunities for its HR customers with Amazon QuickSight

Post Syndicated from Brian Kasen original https://aws.amazon.com/blogs/big-data/trakstar-unlocks-new-analytical-opportunities-for-its-hr-customers-with-amazon-quicksight/

This is a guest post by Brian Kasen and Rebecca McAlpine from Trakstar, now a part of Mitratech.

Trakstar, now a part of Mitratech, is a human resources (HR) software company that serves customers from small businesses and educational institutions to large enterprises, globally. Trakstar supercharges employee performance around pivotal moments in talent development. Our team focuses on helping HR leaders make smarter decisions to attract, retain, and engage their workforce. Trakstar has been used by HR leaders for over 20 years, specifically in the areas of applicant tracking, performance management, and learning management.

In 2023, Trakstar joined Mitratech’s world-class portfolio of Human Resources and Compliance products. This includes complementary solutions for OFCCP compliance management; diversity, equity, and inclusion (DEI) strategy and recruiting (now with Circa); advanced background screening; I-9 compliance; workflow automation; policy management; and more.

In 2022, Trakstar launched what is now called Trakstar Insights, which unlocks new analytical insights for HR across the employee life cycle. It’s powered by Amazon QuickSight, a cloud-native business intelligence (BI) tool that enables embedded customized, interactive visuals and dashboards within the product experience.

In this post, we discuss how we use QuickSight to deliver powerful HR analytics to over 3,000 customers and 43,000 users while forecasting savings of 84% year-over-year as compared to our legacy reporting solutions.

The evolving HR landscape

Over the past few years, new realities have emerged, creating unfamiliar challenges for businesses and new opportunities for HR leaders. In 2020, new working arrangements spawned by immediate necessity, with many companies shifting to fully remote or hybrid setups for the first time. As we still adapt to this new environment, organizations have trouble finding talent, with record-level resignation rates and a tight labor market.

As companies look to combat these new challenges, we’ve seen the rise of the Chief People Officer because organizations now recognize people as their greatest asset. With our three products, Trakstar Hire, Trakstar Perform, and Trakstar Learn, HR leaders can use data to take an integrated approach and foster a better employee experience.

Choosing QuickSight to bring solutions to the new reality of work

To help HR leaders navigate the new challenges of our time and answer new questions, we decided to embed interactive dashboards directly into each Trakstar product focused on the growing area of people analytics. QuickSight allowed us to meet our customers’ needs quickly and played a key role in our overall analytics strategy. Because QuickSight is fully managed and serverless, we were able to focus on building value for customers and develop an embedded dashboard delivery solution to support all three products, rather than focusing on managing and optimizing infrastructure. QuickSight allowed us to focus on building dashboards that address key pain points for customers and rapidly innovate.

In a 12-month timespan, we designed and launched five workforce dashboards to over 20,000 users, spanning hiring, performance, learning, and peer group benchmarking. During the same 12-month time period, in addition to our Trakstar Insights dashboard releases, we also migrated Trakstar Learn’s legacy reporting to QuickSight, which supports an additional 20,000 users.

Delighting our customers by embedding QuickSight

Our goal was to build something that would delight our customers by making a material difference in their daily lives. We set out to create something that went beyond a typical Minimum Viable Product, but rather create a Minimum Lovable Product. By this, we mean delivering something that would make the most significant difference for customers in the shortest time possible.

We used QuickSight to build a curated dashboard that went beyond traditional dashboards of bar charts and tables. Our dashboards present retention trends, hiring trends, and learning outcomes supplemented with data narratives that empower our customers to easily interpret trends and make data-driven decisions.

In January 2022, we launched the Perform Insights dashboards. This enabled our customers to see their data in a way they had never seen before. With this dashboard, HR leaders can compare organizational strengths and weaknesses over time. The power of QuickSight lets our customers slice and dice the data in different ways. As shown in the following screenshot, customers can filter by Review Process Type or Group Type and then take actionable next steps based on data. They can see where top and bottom performers reside within teams and take steps to retain top performers and address lower-performing employees. These were net new analytics for our HR customers.

Our investment in building with QuickSight was quickly validated just days after launch. One of our sales reps was able to engage a lost opportunity and land a multi-year contract for double our typical average contract value. We followed up our first dashboard launch by expanding Trakstar Insights into our other products with the Learning Insights dashboard for Trakstar Learn and Hiring Insights for Trakstar Hire (see the following screenshots). These dashboards provided new lenses into how customers can look at their recruitment and training data.

Through our benchmarking dashboards, we empowered our customers so they can now compare their trends against other Trakstar customers in the same industry or of similar size, as shown in the following screenshots. These benchmarking dashboards can help our customers answer the “Am I normal?” question when it comes to talent acquisition and other areas.

Telling a story with people data for reporting

With the custom narrative visual type in QuickSight, our benchmarking dashboards offer dynamic, customer-specific interpretations of their trends and do the heavy lifting interpretation for them while providing action-oriented recommendations. The burden of manual spreadsheet creation, manual export, data manipulation, and analysis has been eliminated for our customers. They can now simply screenshot sections from the dashboards, drop them into a slide deck, and then speak confidently with their executive teams on what the trends mean for their organization, thereby saving tremendous time and effort and opening the door for new opportunities.

A Trakstar Hire customer shared with us, “You literally just changed my life. I typically spend hours creating slides, and this is the content—right here, ready to screenshot for my presentations!”

Building on our success with QuickSight

With the success of launching Trakstar Insights with QuickSight, we knew we could modernize the reporting functionality in Trakstar Learn by migrating to QuickSight from our legacy embedded BI vendor. Our legacy solution was antiquated and expensive. QuickSight brings a more cohesive and modern look to reporting at a significantly lower overall cost. With the session-based pricing model in QuickSight, we are projecting to save roughly 84% this year while offering customers a more powerful analytics experience.

Summary

Building with QuickSight has helped us thrive by delivering valuable HR solutions to our customers. We are excited to continue innovating with QuickSight to deliver even more value to our customers.

To learn more about how you can embed customized data visuals and interactive dashboards into any application, visit Amazon QuickSight Embedded.


About the authors:

Brian Kasen is the Director, Business Intelligence at Mitratech. He is passionate about helping HR leaders be more data-driven in their efforts to hire, retain, and engage their employees. Prior to Mitratech, Brian spent much of his career building analytic solutions across a range of industries, including higher education, restaurant, and software.

Rebecca McAlpine is the Senior Product Manager for Trakstar Insights at Mitratech. Her experience in HR tech experience has allowed her to work in various areas, including data analytics, business systems optimization, candidate experience, job application management, talent engagement strategy, training, and performance management.

Automate alerting and reporting for AWS Glue job resource usage

Post Syndicated from Michael Hamilton original https://aws.amazon.com/blogs/big-data/automate-alerting-and-reporting-for-aws-glue-job-resource-usage/

Data transformation plays a pivotal role in providing the necessary data insights for businesses in any organization, small and large. To gain these insights, customers often perform ETL (extract, transform, and load) jobs from their source systems and output an enriched dataset. Many organizations today are using AWS Glue to build ETL pipelines that bring data from disparate sources and store the data in repositories like a data lake, database, or data warehouse for further consumption. These organizations are looking for ways they can reduce cost across their IT environments and still be operationally performant and efficient.

Picture a scenario where you, the VP of Data and Analytics, are in charge of your data and analytics environments and workloads running on AWS where you manage a team of data engineers and analysts. This team is allowed to create AWS Glue for Spark jobs in development, test, and production environments. During testing, one of the jobs wasn’t configured to automatically scale its compute resources, resulting in jobs timing out, costing the organization more than anticipated. The next steps usually include completing an analysis of the jobs, looking at cost reports to see which account generated the spike in usage, going through logs to see when what happened with the job, and so on. After the ETL job has been corrected, you may want to implement monitoring and set standard alert thresholds for your AWS Glue environment.

This post will help organizations proactively monitor and cost optimize their AWS Glue environments by providing an easier path for teams to measure efficiency of their ETL jobs and align configuration details according to organizational requirements. Included is a solution you will be able to deploy that will notify your team via email about any Glue job that has been configured incorrectly. Additionally, a weekly report is generated and sent via email that aggregates resource usage and provides cost estimates per job.

AWS Glue cost considerations

AWS Glue for Apache Spark jobs are provisioned with a number of workers and a worker type. These jobs can be either G.1X, G.2X, G.4X, G.8X or Z.2X (Ray) worker types that map to data processing units (DPUs). DPUs include a certain amount of CPU, memory, and disk space. The following table contains more details.

Worker Type DPUs vCPUs Memory (GB) Disk (GB)
G.1X 1 4 16 64
G.2X 2 8 32 128
G.4X 4 16 64 256
G.8X 8 32 128 512
Z.2X 2 8 32 128

For example, if a job is provisioned with 10 workers as G.1X worker type, the job will have access to 40 vCPU and 160 GB of RAM to process data and double using G.2X. Over-provisioning workers can lead to increased cost, due to not all workers being utilized efficiently.

In April 2022, Auto Scaling for AWS Glue was released for AWS Glue version 3.0 and later, which includes AWS Glue for Apache Spark and streaming jobs. Enabling auto scaling on your Glue for Apache Spark jobs will allow you to only allocate workers as needed, up to the worker maximum you specify. We recommend enabling auto scaling for your AWS Glue 3.0 & 4.0 jobs because this feature will help reduce cost and optimize your ETL jobs.

Amazon CloudWatch metrics are also a great way to monitor your AWS Glue environment by creating alarms for certain metrics like average CPU or memory usage. To learn more about how to use CloudWatch metrics with AWS Glue, refer to Monitoring AWS Glue using Amazon CloudWatch metrics.

The following solution provides a simple way to set AWS Glue worker and job duration thresholds, configure monitoring, and receive emails for notifications on how your AWS Glue environment is performing. If a Glue job finishes and detects worker or job duration thresholds were exceeded, it will notify you after the job run has completed, failed, or timed out.

Solution overview

The following diagram illustrates the solution architecture.

Solution Architecture

When you deploy this application via AWS Serverless Application Model (AWS SAM), it will ask what AWS Glue worker and job duration thresholds you would like to set to monitor the AWS Glue for Apache Spark and AWS Glue for Ray jobs running in that account. The solution will use these values as the decision criteria when invoked. The following is a breakdown of each step in the architecture:

  1. Any AWS Glue for Apache Spark job that succeeds, fails, stops, or times out is sent to Amazon EventBridge.
  2. EventBridge picks up the event from AWS Glue and triggers an AWS Lambda function.
  3. The Lambda function processes the event and determines if the data and analytics team should be notified about the particular job run. The function performs the following tasks:
    1. The function sends an email using Amazon Simple Notification Service (Amazon SNS) if needed.
      • If the AWS Glue job succeeded or was stopped without going over the worker or job duration thresholds, or is tagged to not be monitored, no alerts or notifications are sent.
      • If the job succeeded but ran with a worker or job duration thresholds higher than allowed, or the job either failed or timed out, Amazon SNS sends a notification to the designated email with information about the AWS Glue job, run ID, and reason for alerting, along with a link to the specific run ID on the AWS Glue console.
    2. The function logs the job run information to Amazon DynamoDB for a weekly aggregated report delivered to email. The Dynamo table has Time to Live enabled for 7 days, which keeps the storage to minimum.
  4. Once a week, the data within DynamoDB is aggregated by a separate Lambda function with meaningful information like longest-running jobs, number of retries, failures, timeouts, cost analysis, and more.
  5. Amazon Simple Email Service (Amazon SES) is used to deliver the report because it can be better formatted than using Amazon SNS. The email is formatted via HTML output that provides tables for the aggregated job run data.
  6. The data and analytics team is notified about the ongoing job runs through Amazon SNS, and they receive the weekly aggregation report through Amazon SES.

Note that AWS Glue Python shell and streaming ETL jobs are not supported because they’re not in scope of this solution.

Prerequisites

You must have the following prerequisites:

  • An AWS account to deploy the solution to
  • Proper AWS Identity and Access Management (IAM) privileges to create the resources
  • The AWS SAM CLI to build and deploy the solution button below, to run template on your AWS environment

Deploy the solution

This AWS SAM application provisions the following resources:

  • Two EventBridge rules
  • Two Lambda functions
  • An SNS topic and subscription
  • A DynamoDB table
  • An SES subscription
  • The required IAM roles and policies

To deploy the AWS SAM application, complete the following steps:

Clone the aws-samples GitHub repository:

git clone https://github.com/aws-samples/aws-glue-job-tracker.git

Deploy the AWS SAM application:

cd aws-glue-job-tracker
sam deploy --guided

sam deploy configuration

Provide the following parameters:

  • GlueJobWorkerThreshold – Enter the maximum number of workers you want an AWS Glue job to be able to run with before sending threshold alert. The default is 10. An alert will be sent if a Glue job runs with higher workers than specified.
  • GlueJobDurationThreshold – Enter the maximum duration in minutes you want an AWS Glue job to run before sending threshold alert. The default is 480 minutes (8 hours). An alert will be sent if a Glue job runs with higher job duration than specified.
  • GlueJobNotifications – Enter an email or distribution list of those who need to be notified through Amazon SNS and Amazon SES. You can go to the SNS topic after the deployment is complete and add emails as needed.

To receive emails from Amazon SNS and Amazon SES, you must confirm your subscriptions. After the stack is deployed, check your email that was specified in the template and confirm by choosing the link in each message. When the application is successfully provisioned, it will begin monitoring your AWS Glue for Apache Spark job environment. The next time a job fails, times out, or exceeds a specified threshold, you will receive an email via Amazon SNS. For example, the following screenshot shows an SNS message about a job that succeeded but had a job duration threshold violation.

You might have jobs that need to run at a higher worker or job duration threshold, and you don’t want the solution to evaluate them. You can simply tag that job with the key/value of remediate and false. The step function will still be invoked, but will use the PASS state when it recognizes the tag. For more information on job tagging, refer to AWS tags in AWS Glue.

Adding tags to glue job configuration

Configure weekly reporting

As mentioned previously, when an AWS Glue for Apache Spark job succeeds, fails, times out, or is stopped, EventBridge forwards this event to Lambda, where it logs specific information about each job run. Once a week, a separate Lambda function queries DynamoDB and aggregates your job runs to provide meaningful insights and recommendations about your AWS Glue for Apache Spark environment. This report is sent via email with a tabular structure as shown in the following screenshot. It’s meant for top-level visibility so you’re able to see your longest job runs over time, jobs that have had many retries, failures, and more. It also provides an overall cost calculation as an estimate of what each AWS Glue job will cost for that week. It should not be used as a guaranteed cost. If you would like to see exact cost per job, the AWS Cost and Usage Report is the best resource to use. The following screenshot shows one table (of five total) from the AWS Glue report function.

weekly report

Clean up

If you don’t want to run the solution anymore, delete the AWS SAM application for each account that it was provisioned in. To delete your AWS SAM stack, run the following command from your project directory:

sam delete

Conclusion

In this post, we discussed how you can monitor and cost-optimize your AWS Glue job configurations to comply with organizational standards and policy. This method can provide cost controls over AWS Glue jobs across your organization. Some other ways to help control the costs of your AWS Glue for Apache Spark jobs include the newly released AWS Glue Flex jobs and Auto Scaling. We also provided an AWS SAM application as a solution to deploy into your accounts. We encourage you to review the resources provided in this post to continue learning about AWS Glue. To learn more about monitoring and optimizing for cost using AWS Glue, please visit this recent blog. It goes in depth on all of the cost optimization options and includes a template that builds a CloudWatch dashboard for you with metrics about all of your Glue job runs.


About the authors

Michael Hamilton is a Sr Analytics Solutions Architect focusing on helping enterprise customers in the south east modernize and simplify their analytics workloads on AWS. He enjoys mountain biking and spending time with his wife and three children when not working.

Angus Ferguson is a Solutions Architect at AWS who is passionate about meeting customers across the world, helping them solve their technical challenges. Angus specializes in Data & Analytics with a focus on customers in the financial services industry.

Defontana provides business administration solutions to Latin American customers using Amazon QuickSight

Post Syndicated from Cynthia Valeriano original https://aws.amazon.com/blogs/big-data/defontana-provides-business-administration-solutions-to-latin-american-customers-using-amazon-quicksight/

This is a guest post by Cynthia Valeriano, Jaime Olivares, and Guillermo Puelles from DeFontana.

Defontana develops fully cloud-based business applications for the administration and management of companies. Based in Santiago, Chile, with operations in Peru, Mexico, and most recently Colombia, our main product is a 100% cloud-based enterprise resource planning (ERP) system that has been providing value to our business customers for about 20 years. In addition to our core ERP product, we have developed integration modules for ecommerce, banks, financial and regulatory institutions, digital signatures, business reports, and many other solutions. Our goal is to continue building solutions for customers and make Defontana the best business administration tool in Chile and Latin America.

Most of our customers are small and medium businesses (SMBs) who need to optimize their resources. Our ERP system helps customers manage cash flow, time, human resources (HR), and other resources. As we were exploring how to continue innovating for our customers and looking for an embedded analytics solution, we chose Amazon QuickSight, which allows us to seamlessly integrate data-driven experiences into our web application.

In this post, we discuss how QuickSight has sped up our development time, enabled us to provide more value to our customers, and even improved our own internal operations.

Saving development time by embedding QuickSight

We built our ERP service as a web application from the beginning. 10 years ago, this was a big differentiator for us, but to continue to serve the SMBs that trust us for information on a daily basis, we wanted to offer even more advanced analytics. By embedding QuickSight into our web application, we have been able to provide business intelligence (BI) functionalities to customers two or three times faster than we would have if we had opted for libraries for generating HTML reports. Thanks to our embedded QuickSight solution, we are able to focus more of our energy on analyzing the requirements and functionalities that we want to offer our customers in each BI report.

The following screenshots show our ERP service, accessed on a web browser, with insights and rich visualizations by QuickSight.

We enjoy using QuickSight because of how well it integrates with other AWS services. Our data is stored in a legacy relational database management system (RDMS), Amazon Aurora, and Amazon DynamoDB. We are in the process of moving away from that legacy RDMS to PostgreSQL through a Babelfish for Aurora PostgreSQL project. This will allow us to reduce costs while also being able to use a multi-Region database with disaster recovery in the future. This would have been too expensive with the legacy RDMS. To seamlessly transfer data from these databases to Amazon Simple Storage Service (Amazon S3), we use AWS Database Migration Service (AWS DMS). Then, AWS Glue allows us to generate several extract, transform, and load (ETL) processes to prepare the data in Amazon S3 to be used in QuickSight. Finally, we use Amazon Athena to generate views to be used as base information in QuickSight.

Providing essential insights for SMB customers

QuickSight simplifies the generation of dashboards. We have made several out-of-the-box dashboards in QuickSight for our customers that they can use directly on our web app right after they sign up. These dashboards provide insights on sales, accounting, cash flow, financial information, and customer clusters based on their data in our ERP service. These free-to-use reports can be used by all customers in the system. We also have dashboards that can be activated by any user of the system for a trial period. Since we launched add-on dashboards, more than 300 companies have activated it, with over 100 of them choosing to continue using it after the free trial.

Besides generic reports, we have created several tailor-made dashboards according to the specific requirements of each customer. These are managed through a customer-focused development process by our engineering team according to the specifications of each customer. With this option, our customers can get reports on accounts payable, accounts receivable, supply information (purchase order flow, receipts and invoices sent by suppliers), inventory details, and more. We have more than 50 customers who have worked with us on tailor-made dashboards. With the broad range of functionalities within QuickSight, we can offer many data visualization options to our customers.

Empowering our own operations

Beyond using QuickSight to serve our customers, we also use QuickSight for our own BI reporting. So far, we have generated more than 80 dashboards to analyze different business flows. For example, we monitor daily sales in specific services, accounting, software as a service (SaaS) metrics, and the operation of our customers. We do all of this from within our own web application, with the power of QuickSight, giving us the opportunity to experience the interface just like our customers do. In 2023, one of our top goals is to provide a 360-degree view of Defontana using QuickSight.

Conclusion

QuickSight has enabled us to seamlessly embed analytics into our ERP service, providing valuable insights to our SMB customers. We have been able to cut costs and continue to grow throughout Latin America. We plan to use QuickSight even more within our own organization, making us more data-driven. QuickSight will empower us to democratize the information that our own employees receive, establish better processes, and create more tools to analyze customer information for behavioral patterns, which we can use to better meet our customers’ needs.

To learn more about how you can embed customized data visuals and interactive dashboards into any application, visit Amazon QuickSight Embedded.


About the authors

Cynthia Valeriano is a Business Intelligence Developer of at Defontana, with skills focused on data analysis and visualization. With 3 years of experience in administrative areas and 2 years of experience in business intelligence projects, she has been in charge of implementing data migration and transformation tasks with various AWS tools, such as AWS DMS and AWS Glue, in addition to generating multiple dashboards in Amazon QuickSight.

Jaime Olivares is a Senior software developer at Defontana, with 6 years of experience in the development of various technologies focused on the analysis of solutions and customer requirements. Experience with AWS in various services, including product development through QuickSight for the analysis of business and accounting data.

Guillermo Puelles is a Technical Manager of the “Appia” Integrations team at Defontana, with 9 years of experience in software development and 5 years working with AWS tools. Responsible for planning and managing various projects for the implementation of BI solutions through QuickSight and other AWS services.

Improve operational efficiencies of Apache Iceberg tables built on Amazon S3 data lakes

Post Syndicated from Avijit Goswami original https://aws.amazon.com/blogs/big-data/improve-operational-efficiencies-of-apache-iceberg-tables-built-on-amazon-s3-data-lakes/

Apache Iceberg is an open table format for large datasets in Amazon Simple Storage Service (Amazon S3) and provides fast query performance over large tables, atomic commits, concurrent writes, and SQL-compatible table evolution. When you build your transactional data lake using Apache Iceberg to solve your functional use cases, you need to focus on operational use cases for your S3 data lake to optimize the production environment. Some of the important non-functional use cases for an S3 data lake that organizations are focusing on include storage cost optimizations, capabilities for disaster recovery and business continuity, cross-account and multi-Region access to the data lake, and handling increased Amazon S3 request rates.

In this post, we show you how to improve operational efficiencies of your Apache Iceberg tables built on Amazon S3 data lake and Amazon EMR big data platform.

Optimize data lake storage

One of the major advantages of building modern data lakes on Amazon S3 is it offers lower cost without compromising on performance. You can use Amazon S3 Lifecycle configurations and Amazon S3 object tagging with Apache Iceberg tables to optimize the cost of your overall data lake storage. An Amazon S3 Lifecycle configuration is a set of rules that define actions that Amazon S3 applies to a group of objects. There are two types of actions:

  • Transition actions – These actions define when objects transition to another storage class; for example, Amazon S3 Standard to Amazon S3 Glacier.
  • Expiration actions – These actions define when objects expire. Amazon S3 deletes expired objects on your behalf.

Amazon S3 uses object tagging to categorize storage where each tag is a key-value pair. From an Apache Iceberg perspective, it supports custom Amazon S3 object tags that can be added to S3 objects while writing and deleting into the table. Iceberg also let you configure a tag-based object lifecycle policy at the bucket level to transition objects to different Amazon S3 tiers. With the s3.delete.tags config property in Iceberg, objects are tagged with the configured key-value pairs before deletion. When the catalog property s3.delete-enabled is set to false, the objects are not hard-deleted from Amazon S3. This is expected to be used in combination with Amazon S3 delete tagging, so objects are tagged and removed using an Amazon S3 lifecycle policy. This property is set to true by default.

The example notebook in this post shows an example implementation of S3 object tagging and lifecycle rules for Apache Iceberg tables to optimize storage cost.

Implement business continuity

Amazon S3 gives any developer access to the same highly scalable, reliable, fast, inexpensive data storage infrastructure that Amazon uses to run its own global network of web sites. Amazon S3 is designed for 99.999999999% (11 9’s) of durability, S3 Standard is designed for 99.99% availability, and Standard – IA is designed for 99.9% availability. Still, to make your data lake workloads highly available in an unlikely outage situation, you can replicate your S3 data to another AWS Region as a backup. With S3 data residing in multiple Regions, you can use an S3 multi-Region access point as a solution to access the data from the backup Region. With Amazon S3 multi-Region access point failover controls, you can route all S3 data request traffic through a single global endpoint and directly control the shift of S3 data request traffic between Regions at any time. During a planned or unplanned regional traffic disruption, failover controls let you control failover between buckets in different Regions and accounts within minutes. Apache Iceberg supports access points to perform S3 operations by specifying a mapping of bucket to access points. We include an example implementation of an S3 access point with Apache Iceberg later in this post.

Increase Amazon S3 performance and throughput

Amazon S3 supports a request rate of 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix in a bucket. The resources for this request rate aren’t automatically assigned when a prefix is created. Instead, as the request rate for a prefix increases gradually, Amazon S3 automatically scales to handle the increased request rate. For certain workloads that need a sudden increase in the request rate for objects in a prefix, Amazon S3 might return 503 Slow Down errors, also known as S3 throttling. It does this while it scales in the background to handle the increased request rate. Also, if supported request rates are exceeded, it’s a best practice to distribute objects and requests across multiple prefixes. Implementing this solution to distribute objects and requests across multiple prefixes involves changes to your data ingress or data egress applications. Using Apache Iceberg file format for your S3 data lake can significantly reduce the engineering effort through enabling the ObjectStoreLocationProvider feature, which adds an S3 hash [0*7FFFFF] prefix in your specified S3 object path.

Iceberg by default uses the Hive storage layout, but you can switch it to use the ObjectStoreLocationProvider. This option is not enabled by default to provide flexibility to choose the location where you want to add the hash prefix. With ObjectStoreLocationProvider, a deterministic hash is generated for each stored file and a subfolder is appended right after the S3 folder specified using the parameter write.data.path (write.object-storage-path for Iceberg version 0.12 and below). This ensures that files written to Amazon S3 are equally distributed across multiple prefixes in your S3 bucket, thereby minimizing the throttling errors. In the following example, we set the write.data.path value as s3://my-table-data-bucket, and Iceberg-generated S3 hash prefixes will be appended after this location:

CREATE TABLE my_catalog.my_ns.my_table
( id bigint,
data string,
category string)
USING iceberg OPTIONS
( 'write.object-storage.enabled'=true,
'write.data.path'='s3://my-table-data-bucket')
PARTITIONED BY (category);

Your S3 files will be arranged under MURMUR3 S3 hash prefixes like the following:

2021-11-01 05:39:24 809.4 KiB 7ffbc860/my_ns/my_table/00328-1642-5ce681a7-dfe3-4751-ab10-37d7e58de08a-00015.parquet
2021-11-01 06:00:10 6.1 MiB 7ffc1730/my_ns/my_table/00460-2631-983d19bf-6c1b-452c-8195-47e450dfad9d-00001.parquet
2021-11-01 04:33:24 6.1 MiB 7ffeeb4e/my_ns/my_table/00156-781-9dbe3f08-0a1d-4733-bd90-9839a7ceda00-00002.parquet

Using Iceberg ObjectStoreLocationProvider is not a foolproof mechanism to avoid S3 503 errors. You still need to set appropriate EMRFS retries to provide additional resiliency. You can adjust your retry strategy by increasing the maximum retry limit for the default exponential backoff retry strategy or enabling and configuring the additive-increase/multiplicative-decrease (AIMD) retry strategy. AIMD is supported for Amazon EMR releases 6.4.0 and later. For more information, refer to Retry Amazon S3 requests with EMRFS.

In the following sections, we provide examples for these use cases.

Storage cost optimizations

In this example, we use Iceberg’s S3 tags feature with the write tag as write-tag-name=created and delete tag as delete-tag-name=deleted. This example is demonstrated on an EMR version emr-6.10.0 cluster with installed applications Hadoop 3.3.3, Jupyter Enterprise Gateway 2.6.0, and Spark 3.3.1. The examples are run on a Jupyter Notebook environment attached to the EMR cluster. To learn more about how to create an EMR cluster with Iceberg and use Amazon EMR Studio, refer to Use an Iceberg cluster with Spark and the Amazon EMR Studio Management Guide, respectively.

The following examples are also available in the sample notebook in the aws-samples GitHub repo for quick experimentation.

Configure Iceberg on a Spark session

Configure your Spark session using the %%configure magic command. You can use either the AWS Glue Data Catalog (recommended) or a Hive catalog for Iceberg tables. In this example, we use a Hive catalog, but we can change to the Data Catalog with the following configuration:

spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog

Before you run this step, create a S3 bucket and an iceberg folder in your AWS account with the naming convention <your-iceberg-storage-blog>/iceberg/.

Update your-iceberg-storage-blog in the following configuration with the bucket that you created to test this example. Note the configuration parameters s3.write.tags.write-tag-name and s3.delete.tags.delete-tag-name, which will tag the new S3 objects and deleted objects with corresponding tag values. We use these tags in later steps to implement S3 lifecycle policies to transition the objects to a lower-cost storage tier or expire them based on the use case.

%%configure -f { "conf":{ "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dev.catalog-impl":"org.apache.iceberg.hive.HiveCatalog", "spark.sql.catalog.dev.io-impl":"org.apache.iceberg.aws.s3.S3FileIO", "spark.sql.catalog.dev.warehouse":"s3://&amp;amp;lt;your-iceberg-storage-blog&amp;amp;gt;/iceberg/", "spark.sql.catalog.dev.s3.write.tags.write-tag-name":"created", "spark.sql.catalog.dev.s3.delete.tags.delete-tag-name":"deleted", "spark.sql.catalog.dev.s3.delete-enabled":"false" } }

Create an Apache Iceberg table using Spark-SQL

Now we create an Iceberg table for the Amazon Product Reviews Dataset:

spark.sql(""" DROP TABLE if exists dev.db.amazon_reviews_iceberg""")
spark.sql(""" CREATE TABLE dev.db.amazon_reviews_iceberg (
marketplace string,
customer_id string,
review_id string,
product_id string,
product_parent string,
product_title string,
star_rating int,
helpful_votes int,
total_votes int,
vine string,
verified_purchase string,
review_headline string,
review_body string,
review_date date,
year int)
USING iceberg
location 's3://<your-iceberg-storage-blog>/iceberg/db/amazon_reviews_iceberg'
PARTITIONED BY (years(review_date))""")

In the next step, we load the table with the dataset using Spark actions.

Load data into the Iceberg table

While inserting the data, we partition the data by review_date as per the table definition. Run the following Spark commands in your PySpark notebook:

df = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/*.parquet")

df.sortWithinPartitions("review_date").writeTo("dev.db.amazon_reviews_iceberg").append()

Insert a single record into the same Iceberg table so that it creates a partition with the current review_date:

spark.sql("""insert into dev.db.amazon_reviews_iceberg values ("US", "99999999","R2RX7KLOQQ5VBG","B00000JBAT","738692522","Diamond Rio Digital",3,0,0,"N","N","Why just 30 minutes?","RIO is really great",date("2023-04-06"),2023)""")

You can check the new snapshot is created after this append operation by querying the Iceberg snapshot:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

You will see an output similar to the following showing the operations performed on the table.

Check the S3 tag population

You can use the AWS Command Line Interface (AWS CLI) or the AWS Management Console to check the tags populated for the new writes. Let’s check the tag corresponding to the object created by a single row insert.

On the Amazon S3 console, check the S3 folder s3://your-iceberg-storage-blog/iceberg/db/amazon_reviews_iceberg/data/ and point to the partition review_date_year=2023/. Then check the Parquet file under this folder to check the tags associated with the data file in Parquet format.

From the AWS CLI, run the following command to see that the tag is created based on the Spark configuration spark.sql.catalog.dev.s3.write.tags.write-tag-name":"created":

[email protected] ~ % aws s3api get-object-tagging --bucket your-iceberg-storage-blog --key iceberg/db/amazon_reviews_iceberg/data/review_date_year=2023/00000-43-2fb892e3-0a3f-4821-a356-83204a69fa74-00001.parquet

You will see an output, similar to the below, showing the associated tags for the file

{ "VersionId": "null", "TagSet": [{ "Key": "write-tag-name", "Value": "created" } ] }

Delete a record and expire a snapshot

In this step, we delete a record from the Iceberg table and expire the snapshot corresponding to the deleted record. We delete the new single record that we inserted with the current review_date:

spark.sql("""delete from dev.db.amazon_reviews_iceberg where review_date = '2023-04-06'""")

We can now check that a new snapshot was created with the operation flagged as delete:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

This is useful if we want to time travel and check the deleted row in the future. In that case, we have to query the table with the snapshot-id corresponding to the deleted row. However, we don’t discuss time travel as part of this post.

We expire the old snapshots from the table and keep only the last two. You can modify the query based on your specific requirements to retain the snapshots:

spark.sql ("""CALL dev.system.expire_snapshots(table => 'dev.db.amazon_reviews_iceberg', older_than => DATE '2024-01-01', retain_last => 2)""")

If we run the same query on the snapshots, we can see that we have only two snapshots available:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

From the AWS CLI, you can run the following command to see that the tag is created based on the Spark configuration spark.sql.catalog.dev.s3. delete.tags.delete-tag-name":"deleted":

[email protected] ~ % aws s3api get-object-tagging --bucket avijit-iceberg-storage-blog --key iceberg/db/amazon_reviews_iceberg/data/review_date_year=2023/00000-43-2fb892e3-0a3f-4821-a356-83204a69fa74-00001.parquet

You will see output similar to below showing the associated tags for the file

{ "VersionId": "null", "TagSet": [ { "Key": "delete-tag-name", "Value": "deleted" }, { "Key": "write-tag-name", "Value": "created" } ] }

You can view the existing metadata files from the metadata log entries metatable after the expiration of snapshots:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.metadata_log_entries""").show()

The snapshots that have expired show the latest snapshot ID as null.

Create S3 lifecycle rules to transition the buckets to a different storage tier

Create a lifecycle configuration for the bucket to transition objects with the delete-tag-name=deleted S3 tag to the Glacier Instant Retrieval class. Amazon S3 runs lifecycle rules one time every day at midnight Universal Coordinated Time (UTC), and new lifecycle rules can take up to 48 hours to complete the first run. Amazon S3 Glacier is well suited to archive data that needs immediate access (with milliseconds retrieval). With S3 Glacier Instant Retrieval, you can save up to 68% on storage costs compared to using the S3 Standard-Infrequent Access (S3 Standard-IA) storage class, when the data is accessed once per quarter.

When you want to access the data back, you can bulk restore the archived objects. After you restore the objects back in S3 Standard class, you can register the metadata and data as an archival table for query purposes. The metadata file location can be fetched from the metadata log entries metatable as illustrated earlier. As mentioned before, the latest snapshot ID with Null values indicates expired snapshots. We can take one of the expired snapshots and do the bulk restore:

spark.sql("""CALL dev.system.register_table(table => 'db.amazon_reviews_iceberg_archive', metadata_file => 's3://avijit-iceberg-storage-blog/iceberg/db/amazon_reviews_iceberg/metadata/00000-a010f15c-7ac8-4cd1-b1bc-bba99fa7acfc.metadata.json')""").show()

Capabilities for disaster recovery and business continuity, cross-account and multi-Region access to the data lake

Because Iceberg doesn’t support relative paths, you can use access points to perform Amazon S3 operations by specifying a mapping of buckets to access points. This is useful for multi-Region access, cross-Region access, disaster recovery, and more.

For cross-Region access points, we need to additionally set the use-arn-region-enabled catalog property to true to enable S3FileIO to make cross-Region calls. If an Amazon S3 resource ARN is passed in as the target of an Amazon S3 operation that has a different Region than the one the client was configured with, this flag must be set to ‘true‘ to permit the client to make a cross-Region call to the Region specified in the ARN, otherwise an exception will be thrown. However, for the same or multi-Region access points, the use-arn-region-enabled flag should be set to ‘false’.

For example, to use an S3 access point with multi-Region access in Spark 3.3, you can start the Spark SQL shell with the following code:

spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket2/my/key/prefix \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.use-arn-region-enabled=false \
--conf spark.sql.catalog.test.s3.access-points.my-bucket1=arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap \
--conf spark.sql.catalog.test.s3.access-points.my-bucket2=arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap

In this example, the objects in Amazon S3 on my-bucket1 and my-bucket2 buckets use the arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap access point for all Amazon S3 operations.

For more details on using access points, refer to Using access points with compatible Amazon S3 operations.

Let’s say your table path is under mybucket1, so both mybucket1 in Region 1 and mybucket2 in Region have paths of mybucket1 inside the metadata files. At the time of the S3 (GET/PUT) call, we replace the mybucket1 reference with a multi-Region access point.

Handling increased S3 request rates

When using ObjectStoreLocationProvider (for more details, see Object Store File Layout), a deterministic hash is generated for each stored file, with the hash appended directly after the write.data.path. The problem with this is that the default hashing algorithm generates hash values up to Integer MAX_VALUE, which in Java is (2^31)-1. When this is converted to hex, it produces 0x7FFFFFFF, so the first character variance is restricted to only [0-8]. As per Amazon S3 recommendations, we should have the maximum variance here to mitigate this.

Starting from Amazon EMR 6.10, Amazon EMR added an optimized location provider that makes sure the generated prefix hash has uniform distribution in the first two characters using the character set from [0-9][A-Z][a-z].

This location provider has been recently open sourced by Amazon EMR via Core: Improve bit density in object storage layout and should be available starting from Iceberg 1.3.0.

To use, make sure the iceberg.enabled classification is set to true, and write.location-provider.impl is set to org.apache.iceberg.emr.OptimizedS3LocationProvider.

The following is a sample Spark shell command:

spark-shell --conf spark.driver.memory=4g \
--conf spark.executor.cores=4 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/iceberg-V516168123 \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.table-override.write.location-provider.impl=org.apache.iceberg.emr.OptimizedS3LocationProvider

The following example shows that when you enable the object storage in your Iceberg table, it adds the hash prefix in your S3 path directly after the location you provide in your DDL.

Define the table write.object-storage.enabled parameter and provide the S3 path, after which you want to add the hash prefix using write.data.path (for Iceberg Version 0.13 and above) or write.object-storage.path (for Iceberg Version 0.12 and below) parameters.

Insert data into the table you created.

The hash prefix is added right after the /current/ prefix in the S3 path as defined in the DDL.

Clean up

After you complete the test, clean up your resources to avoid any recurring costs:

  1. Delete the S3 buckets that you created for this test.
  2. Delete the EMR cluster.
  3. Stop and delete the EMR notebook instance.

Conclusion

As companies continue to build newer transactional data lake use cases using Apache Iceberg open table format on very large datasets on S3 data lakes, there will be an increased focus on optimizing those petabyte-scale production environments to reduce cost, improve efficiency, and implement high availability. This post demonstrated mechanisms to implement the operational efficiencies for Apache Iceberg open table formats running on AWS.

To learn more about Apache Iceberg and implement this open table format for your transactional data lake use cases, refer to the following resources:


About the Authors

Avijit Goswami is a Principal Solutions Architect at AWS specialized in data and analytics. He supports AWS strategic customers in building high-performing, secure, and scalable data lake solutions on AWS using AWS managed services and open-source solutions. Outside of his work, Avijit likes to travel, hike in the San Francisco Bay Area trails, watch sports, and listen to music.

Rajarshi Sarkar is a Software Development Engineer at Amazon EMR/Athena. He works on cutting-edge features of Amazon EMR/Athena and is also involved in open-source projects such as Apache Iceberg and Trino. In his spare time, he likes to travel, watch movies, and hang out with friends.

Prashant Singh is a Software Development Engineer at AWS. He is interested in Databases and Data Warehouse engines and has worked on Optimizing Apache Spark performance on EMR. He is an active contributor in open source projects like Apache Spark and Apache Iceberg. During his free time, he enjoys exploring new places, food and hiking.

Real-time time series anomaly detection for streaming applications on Amazon Kinesis Data Analytics

Post Syndicated from Antonio Vespoli original https://aws.amazon.com/blogs/big-data/real-time-time-series-anomaly-detection-for-streaming-applications-on-amazon-kinesis-data-analytics/

Detecting anomalies in real time from high-throughput streams is key for informing on timely decisions in order to adapt and respond to unexpected scenarios. Stream processing frameworks such as Apache Flink empower users to design systems that can ingest and process continuous flows of data at scale. In this post, we present a streaming time series anomaly detection algorithm based on matrix profiles and left-discords, inspired by Lu et al., 2022, with Apache Flink, and provide a working example that will help you get started on a managed Apache Flink solution using Amazon Kinesis Data Analytics.

Challenges of anomaly detection

Anomaly detection plays a key role in a variety of real-world applications, such as fraud detection, sales analysis, cybersecurity, predictive maintenance, and fault detection, among others. The majority of these use cases require actions to be taken in near real-time. For instance, card payment networks must be able to identify and reject potentially fraudulent transactions before processing them. This raises the challenge to design near-real-time anomaly detection systems that are able to scale to ultra-fast arriving data streams.

Another key challenge that anomaly detection systems face is concept drift. The ever-changing nature of some use cases requires models to dynamically adapt to new scenarios. For instance, in a predictive maintenance scenario, you could use several Internet of Things (IoT) devices to monitor the vibrations produced by an electric motor with the objective of detecting anomalies and preventing unrecoverable damage. Sounds emitted by the vibrations of the motor can vary significantly over time due to different environmental conditions such as temperature variations, and this shift in pattern can invalidate the model. This class of scenarios creates the necessity for online learning—the ability of the model to continuously learn from new data.

Time series anomaly detection

Time series are a particular class of data that incorporates time in their structuring. The data points that characterize a time series are recorded in an orderly fashion and are chronological in nature. This class of data is present in every industry and is common at the core of many business requirements or key performance indicators (KPIs). Natural sources of time series data include credit card transactions, sales, sensor measurements, machine logs, and user analytics.

In the time series domain, an anomaly can be defined as a deviation from the expected patterns that characterize the time series. For instance, a time series can be characterized by its expected ranges, trends, seasonal, or cyclic patterns. Any significant alteration of this normal flow of data points is considered an anomaly.

Detecting anomalies can be more or less challenging depending on the domain. For instance, a threshold-based approach might be suitable for time series that are informed of their expected ranges, such as the working temperature of a machine or CPU utilization. On the other hand, applications such as fraud detection, cybersecurity, and predictive maintenance can’t be classified via simple rule-based approaches and require a more fine-grained mechanism to capture unexpected observations. Thanks to their parallelizable and event-driven setup, streaming engines such as Apache Flink provide an excellent environment for scaling real-time anomaly detection to fast-arriving data streams.

Solution overview

Apache Flink is a distributed processing engine for stateful computations over streams. A Flink program can be implemented in Java, Scala, or Python. It supports ingestion, manipulation, and delivery of data to the desired destinations. Kinesis Data Analytics allows you to run Flink applications in a fully managed environment on AWS.

Distance-based anomaly detection is a popular approach where a model is characterized by a number of internally stored data points that are used for comparison against the new incoming data points. At inference time, these methods compute distances and classify new data points according to how dissimilar they are from the past observations. In spite of the plethora of algorithms in literature, there is increasing evidence that distance-based anomaly detection algorithms are still competitive with the state of the art (Nakamura et al., 2020).

In this post, we present a streaming version of a distance-based unsupervised anomaly detection algorithm called time series discords, and explore some of the optimizations introduced by the Discord Aware Matrix Profile (DAMP) algorithm (Lu et al., 2022), which further develops the discords method to scale to trillions of data points.

Understanding the algorithm

A left-discord is a subsequence that is significantly dissimilar from all the subsequences that precede it. In this post, we demonstrate how to use the concept of left-discords to identify time series anomalies in streams using Kinesis Data Analytics for Apache Flink.

Let’s consider an unbounded stream and all its subsequences of length n. The m most recent subsequences will be stored and used for inference. When a new data point arrives, a new subsequence that includes the new event is formed. The algorithm compares this latest subsequence (query) to the m subsequences retained from the model, with the exclusion of the latest n subsequences because they overlap with the query and would therefore characterize a self-match. After computing these distances, the algorithm classifies the query as an anomaly if its distance from its closest non-self-matching subsequence is above a certain moving threshold.

For this post, we use a Kinesis data stream to ingest the input data, a Kinesis Data Analytics application to run the Flink anomaly detection program, and another Kinesis data stream to ingest the output produced by your application. For visualization purposes, we consume from the output stream using Kinesis Data Analytics Studio, which provides an Apache Zeppelin Notebook that we use to visualize and interact with the data in real time.

Implementation details

The Java application code for this example is available on GitHub. To download the application code, complete the following steps:

  1. Clone the remote repository using the following command:
    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples

  2. Navigate to the amazon-kinesis-data-analytics-java-examples/AnomalyDetection/LeftDiscords directory:

Let’s walk through the code step by step.

The MPStreamingJob class defines the data flow of the application, and the MPProcessFunction class defines the logic of the function that detects anomalies.

The implementation is best described by three core components:

  • The Kinesis data stream source, used to read from the input stream
  • The anomaly detection process function
  • The Kinesis data stream sink, used to deliver the output into the output stream

The anomaly detection function is implemented as a ProcessFunction<String, String>. Its method MPProcessFunction#processElement is called for every data point:

@Override
public void processElement(String dataPoint, ProcessFunction<String, OutputWithLabel>.Context context,
                            Collector<OutputWithLabel> collector) {

   Double record = Double.parseDouble(dataPoint);

   int currentIndex = timeSeriesData.add(record);

   Double minDistance = 0.0;
   String anomalyTag = "INITIALISING";

   if (timeSeriesData.readyToCompute()) {
       minDistance = timeSeriesData.computeNearestNeighbourDistance();
       threshold.update(minDistance);
   }

   /*
   * Algorithm will wait for initializationPeriods * sequenceLength data points until starting
   * to compute the Matrix Profile (MP).
   */
   if (timeSeriesData.readyToInfer()) {
       anomalyTag = minDistance > threshold.getThreshold() ? "IS_ANOMALY" : "IS_NOT_ANOMALY";
   }

   OutputWithLabel output = new OutputWithLabel(currentIndex, record, minDistance, anomalyTag);

   collector.collect(output);
}

For every incoming data point, the anomaly detection algorithm takes the following actions:

  1. Adds the record to the timeSeriesData.
  2. If it has observed at least 2 * sequenceLength data points, starts computing the matrix profile.
  3. If it has observed at least initializationPeriods * sequenceLength data points, starts outputting anomaly labels.

Following these actions, the MPProcessFunction function outputs an OutputWithLabel object with four attributes:

  • index – The index of the data point in the time series
  • input – The input data without any transformation (identity function)
  • mp – The distance to the closest non-self-matching subsequence for the subsequence ending in index
  • anomalyTag – A binary label that indicates whether the subsequence is an anomaly

In the provided implementation, the threshold is learned online by fitting a normal distribution to the matrix profile data:

/*
 * Computes the threshold as two standard deviations away from the mean (p = 0.02)
 *
 * @return an estimated threshold
 */
public Double getThreshold() {
   Double mean = sum/counter;

   return mean + 2 * Math.sqrt(squaredSum/counter - mean*mean);
}

In this example, the algorithm classifies as anomalies those subsequences whose distance from their nearest neighbor deviates significantly from the average minimum distance (more than two standard deviations away from the mean).

The TimeSeries class implements the data structure that retains the context window, namely, the internally stored records that are used for comparison against the new incoming records. In the provided implementation, the n most recent records are retained, and when the TimeSeries object is at capacity, the oldest records are overridden.

Prerequisites

Before you create a Kinesis Data Analytics application for this exercise, create two Kinesis data streams: InputStream and OutputStream in us-east-1. The Flink application will use these streams as its respective source and destination streams. To create these resources, launch the following AWS CloudFormation stack:

Launch Stack

Alternatively, follow the instructions in Creating and Updating Data Streams.

Create the application

To create your application, complete the following steps:

  1. Clone the remote repository using the following command:
    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples 

  2. Navigate to the amazon-kinesis-data-analytics-java-examples/AnomalyDetection/LeftDiscords/core directory.
    cd amazon-kinesis-data-analytics-java-examples/AnomalyDetection/LeftDiscords/core

  3. Create your JAR file by running the following Maven command in the core directory, which contains the pom.xml file:
    mvn package -Dflink.version=1.15.4
  4. Create an Amazon Simple Storage Service (Amazon S3) bucket and upload the file target/left-discords-1.0.0.jar.
  5. Create and run a Kinesis Data Analytics application as described in Create and Run the Kinesis Data Analytics Application:
    1. Use the target/left-discords-1.0.0.jar.
    2. Note that the input and output streams are called InputStream and OutputStream, respectively.
    3. The provided example is set up to run in us-east-1.

Populate the input stream

You can populate InputStream by running the script.py file from the cloned repository, using the command python script.py. By editing the last two lines, you can populate the stream with synthetic data or with real data from a CSV dataset.

Visualize data on Kinesis Data Analytics Studio

Kinesis Data Analytics Studio provides the perfect setup for observing data in real time. The following screenshot shows sample visualizations. The first plot shows the incoming time series data, the second plot shows the matrix profile, and the third plot shows which data points have been classified as anomalies.

To visualize the data, complete the following steps:

  1. Create a notebook.
  2. Add the following paragraphs to the Zeppelin note:

Create a table and define the shape of the records generated by the application:

%flink.ssql

CREATE TABLE data (
index INT,
input VARCHAR(6),
mp VARCHAR(6),
anomalyTag VARCHAR(20)
)
PARTITIONED BY (index)
WITH (
'connector' = 'kinesis',
'stream' = 'OutputStream',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)

Visualize the input data (choose Line Chart from the visualization options):

%flink.ssql(type=update)

SELECT index, input FROM data;

Visualize the output matrix profile data (choose Scatter Chart from the visualization options):

%flink.ssql(type=update)

SELECT index, mp FROM data;

Visualize the labeled data (choose Scatter Chart from the visualization options):

%flink.ssql(type=update)

SELECT index, anomalyTag FROM data;

Clean up

To delete all the resources that you created, follow the instructions in Clean Up AWS Resources.

Future developments

In this section, we discuss future developments for this solution.

Optimize for speed

The online time series discords algorithm is further developed and optimized for speed in Lu et al., 2022. The proposed optimizations include:

  • Early stopping – If the algorithm finds a subsequence that is similar enough (below the threshold), it stops searching and marks the query as non-anomaly.
  • Look-ahead windowing – Look at some amount of data in the future and compare it to the current query to cheaply discover and prune future subsequences that could not be left-discords. Note that this introduces some delay. The reason why disqualifying improves performance is that data points that are close in time are more likely to be similar than data points that are distant in time.
  • Use of MASS – The MASS (Mueen’s Algorithm for Similarity Search) search algorithm is designed for efficiently discovering the most similar subsequence in the past.

Parallelization

The algorithm above operates with parallelism 1, which means that when a single worker is enough to handle the data stream throughput, the above algorithm can be directly used. This design can be enhanced with further distribution logic for handling high throughput scenarios. In order to parallelise this algorithm, you may to design a partitioner operator that ensures that the anomaly detection operators would have at their disposal the relevant past data points. The algorithm can maintain a set of the most recent records to which it compares the query. Efficiency and accuracy trade-offs of approximate solutions are interesting to explore. Since the best solution for parallelising the algorithm depends largely on the nature of the data, we recommend experimenting with various approaches using your domain-specific knowledge.

Conclusion

In this post, we presented a streaming version of an anomaly detection algorithm based on left-discords. By implementing this solution, you learned how to deploy an Apache Flink-based anomaly detection solution on Kinesis Data Analytics, and you explored the potential of Kinesis Data Analytics Studio for visualizing and interacting with streaming data in real time. For more details on how to implement anomaly detection solutions in Apache Flink, refer to the GitHub repository that accompanies this post. To learn more about Kinesis Data Analytics and Apache Flink, explore the Amazon Kinesis Data Analytics Developer Guide.

Give it a try and share your feedback in the comments section.


About the Authors

Antonio Vespoli is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Samuel Siebenmann is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Nuno Afonso is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Softbrain provides advanced analytics to sales customers with Amazon QuickSight

Post Syndicated from Kenta Oda original https://aws.amazon.com/blogs/big-data/softbrain-provides-advanced-analytics-to-sales-customers-with-amazon-quicksight/

This is a guest post by Kenta Oda from SOFTBRAIN Co., Ltd.

Softbrain is a leading Japanese producer of software for sales force automation (SFA) and customer relationship management (CRM). Our main product, e-Sales Manager (eSM), is an SFA/CRM tool that provides sales support to over 5,500 companies in Japan. We provide our sales customers with a one-stop source for information and visualization of sales activity, improving their efficiency and agility, which leads to greater business opportunity.

With increasing demand from our customers for analyzing data from different angles throughout the sales process, we needed an embedded analytics tool. We chose Amazon QuickSight, a cloud-native business intelligence (BI) tool that allows you to embed insightful analytics into any application with customized, interactive visuals and dashboards. It integrates seamlessly with eSM and is easy to use at a low cost.

In this post, we discuss how QuickSight is helping us provide our sales customers with the insights they need, and why we consider this business decision a win for Softbrain.

There were four things we were looking for in an embedded analytics solution:

  • Rich visualization – With our previous solution, which was built in-house, there were only four types of visuals, so it was difficult to combine multiple graphs for an in-depth analysis.
  • Development speed – We needed to be able to quickly implement BI functionalities. QuickSight requires minimal development due to its serverless architecture, embedding, and API.
  • Cost – We moved from Tableau to QuickSight because QuickSight allowed us to provide data analysis and visualizations to our customers at a competitive price—ensuring that more of our customers can afford it.
  • Ease of use – QuickSight is cloud-based and has an intuitive UX for our sales customers to work with.

Innovating with QuickSight

Individual productivity must be greatly improved to keep up with the shifting labor market in Japan. At Softbrain, we aim to innovate using the latest technology to provide science-based insights into customer and sales interactions, enabling those who use eSM to be much more productive. Sales reps and managers are able to make informed decisions.

By using QuickSight as our embedded analytics solution, we can offer data visualizations at a much lower price point, making it much more accessible for our customers than we could with other BI solutions. When we combine the process management system offered by eSM with the intuitive user experience and rich visualization capability of QuickSight, we empower customers to understand their sales data, which sits in Amazon Simple Storage Service (Amazon S3) and Amazon Aurora, and act on it.

Seamless console embedding

What sets QuickSight apart from other BI tools is console embedding, which means our customers have the ability to build their own dashboards within eSM. They can choose which visualizations they want and take an in-depth look at their data. Sales strategy requires agility, and our customers need more than a fixed dashboard. QuickSight offers freedom and flexibility with console embedding.

Console embedding allows eSM to be a one-stop source for all the information sales reps and managers need. They can access all the analyses they need to make decisions right from their web browser because QuickSight is fully managed and serverless. With other BI solutions, the user would need to have the client application installed on their computer to create their own dashboards.

Empowering our sales customers

With insights from QuickSight embedded into eSM, sales reps can analyze the gap between their budget and actual revenue to build an action plan to fill the gap. They can use their dashboards to analyze data on a weekly and monthly basis. They can share this information at meetings and explore the data to figure out why there might be low attainment for certain customers. Our customers can use eSM and QuickSight to understand why win or loss opportunities are increasing. Managers can analyze and compare the performance of their sales reps to learn what high-performing reps are doing and help low performers. Sales reps can also evaluate their own performance.

Driving 95% customer retention rate

All of these insights come from putting sales data into eSM and QuickSight. It’s no secret that our customers love QuickSight. We can boast a 95% customer retention rate and offer QuickSight as an embedded BI solution at largest scale in Japan.

To learn more about how you can embed customized data visuals and interactive dashboards into any application, visit Amazon QuickSight Embedded.


About the author

Kenta Oda is the Chief Technology Officer at SOFTBRAIN Co., Ltd. He is in responsible of new product development with keen insight on better customer experience and go-to-market strategy.

Stream data with Amazon MSK Connect using an open-source JDBC connector

Post Syndicated from Manish Virwani original https://aws.amazon.com/blogs/big-data/stream-data-with-amazon-msk-connect-using-an-open-source-jdbc-connector/

Customers are adopting Amazon Managed Service for Apache Kafka (Amazon MSK) as a fast and reliable streaming platform to build their enterprise data hub. In addition to streaming capabilities, setting up Amazon MSK enables organizations to use a pub/sub model for data distribution with loosely coupled and independent components.

To publish and distribute the data between Apache Kafka clusters and other external systems including search indexes, databases, and file systems, you’re required to set up Apache Kafka Connect, which is the open-source component of Apache Kafka framework, to host and run connectors for moving data between various systems. As the number of upstream and downstream applications grow, so does the complexity to manage, scale, and administer the Apache Kafka Connect clusters. To address these scalability and manageability concerns, Amazon MSK Connect provides the functionality to deploy fully managed connectors built for Apache Kafka Connect, with the capability to automatically scale to adjust with the workload changes and pay only for the resources consumed.

In this post, we walk through a solution to stream data from Amazon Relation Database Service (Amazon RDS) for MySQL to an MSK cluster in real time by configuring and deploying a connector using Amazon MSK Connect.

Solution overview

For our use case, an enterprise wants to build a centralized data repository that has multiple producer and consumer applications. To support streaming data from applications with different tools and technologies, Amazon MSK is selected as the streaming platform. One of the primary applications that currently writes data to Amazon RDS for MySQL would require major design changes to publish data to MSK topics and write to the database at the same time. Therefore, to minimize the design changes, this application would continue writing the data to Amazon RDS for MySQL with the additional requirement to synchronize this data with the centralized streaming platform Amazon MSK to enable real-time analytics for multiple downstream consumers.

To solve this use case, we propose the following architecture that uses Amazon MSK Connect, a feature of Amazon MSK, to set up a fully managed Apache Kafka Connect connector for moving data from Amazon RDS for MySQL to an MSK cluster using the open-source JDBC connector from Confluent.

Set up the AWS environment

To set up this solution, you need to create a few AWS resources. The AWS CloudFormation template provided in this post creates all the AWS resources required as prerequisites:

The following table lists the parameters you must provide for the template.

Parameter Name Description Keep Default Value
Stack name Name of CloudFormation stack. No
DBInstanceID Name of RDS for MySQL instance. No
DBName Database name to store sample data for streaming. Yes
DBInstanceClass Instance type for RDS for MySQL instance. No
DBAllocatedStorage Allocated size for DB instance (GiB). No
DBUsername Database user for MySQL database access. No
DBPassword Password for MySQL database access. No
JDBCConnectorPluginBukcetName Bucket for storing MSK Connect connector JAR files and plugin. No
ClientIPCIDR IP address of client machine to connect to EC2 instance. No
EC2KeyPair Key pair to be used in your EC2 instance. This EC2 instance will be used as a proxy to connect from your local machine to the EC2 client instance. No
EC2ClientImageId Latest AMI ID of Amazon Linux 2. You can keep the default value for this post. Yes
VpcCIDR IP range (CIDR notation) for this VPC. No
PrivateSubnetOneCIDR IP range (CIDR notation) for the private subnet in the first Availability Zone. No
PrivateSubnetTwoCIDR IP range (CIDR notation) for the private subnet in the second Availability Zone. No
PrivateSubnetThreeCIDR IP range (CIDR notation) for the private subnet in the third Availability Zone. No
PublicSubnetCIDR IP range (CIDR notation) for the public subnet. No

To launch the CloudFormation stack, choose Launch Stack:

After the CloudFormation template is complete and the resources are created, the Outputs tab shows the resource details.

Validate sample data in the RDS for MySQL instance

To prepare the sample data for this use case, complete the following steps:

  1. SSH into the EC2 client instance MSKEC2Client using the following command from your local terminal:
    ssh -i <keypair> <user>@<hostname>

  2. Run the following commands to validate the data has been loaded successfully:
    $ mysql -h <rds_instance_endpoint_name> -u <user_name> -p
    
    MySQL [(none)]> use dms_sample;
    
    MySQL [dms_sample]> select mlb_id, mlb_name, mlb_pos, mlb_team_long, bats, throws from mlb_data limit 5;

Synchronize all tables’ data from Amazon RDS to Amazon MSK

To sync all tables from Amazon RDS to Amazon MSK, create an Amazon MSK Connect managed connector with the following steps:

  1. On the Amazon MSK console, choose Custom plugins in the navigation pane under MSK Connect.
  2. Choose Create custom plugin.
  3. For S3 URI – Custom plugin object, browse to the ZIP file named confluentinc-kafka-connect-jdbc-plugin.zip (created by the CloudFormation template) for the JDBC connector in the S3 bucket bkt-msk-connect-plugins-<aws_account_id>.
  4. For Custom plugin name, enter msk-confluent-jdbc-plugin-v1.
  5. Enter an optional description.
  6. Choose Create custom plugin.

After the custom plugin has been successfully created, it will be available in Active status

  1. Choose Connectors in the navigation pane under MSK Connect.
  2. Choose Create connector.
  3. Select Use existing custom plugin and under Custom plugins, select the plugin msk-confluent-jdbc-plugin-v1 that you created earlier.
  4. Choose Next.
  5. For Connector name, enter msk-jdbc-connector-rds-to-msk.
  6. Enter an optional description.
  7. For Cluster type, select MSK cluster.
  8. For MSK clusters, select the cluster you created earlier.
  9. For Authentication, choose IAM.
  10. Under Connector configurations, enter the following settings:
    ### CONNECTOR SPECIFIC SETTINGS
    
    ### Provide the configuration properties to connect to source and destination endpoints including authentication
    
    ### mechanism, credentials and task details such as polling interval, source and destination object names, data
    
    ### transfer mode, parallelism
    
    ### Many of these properties are connector and end-point specific, so please review the connector documentation ### for more details
    
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    
    connection.user=admin
    
    connection.url=jdbc:mysql://<rds_instance_endpoint_name>:3306/dms_sample
    
    connection.password=XXXXX
    
    tasks.max=1
    
    poll.interval.ms=300000
    
    topic.prefix=rds-to-msk-
    
    mode=bulk
    
    connection.attempts=1
    
    ### CONVERTING KAFKA MESSAGE BYTES TO JSON
    
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    key.converter=org.apache.kafka.connect.storage.StringConverter
    
    value.converter.schemas.enable=false
    
    key.converter.schemas.enable=false
    
    ###GENERIC AUTHENTICATION SETTINGS FOR KAFKA CONNECT
    
    security.protocol=SASL_SSL
    
    sasl.mechanism=AWS_MSK_IAM
    
    ssl.truststore.location=~/kafka.truststore.jks
    
    ssl.keystore.location=~/kafka.client.keystore.jks
    
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

The following table provides a brief summary of all the preceding configuration options.

Configuration Options Description
connector.class JAVA class for the connector
connection.user User name to authenticate with the MySQL endpoint
connection.url JDBC URL identifying the hostname and port number for the MySQL endpoint
connection.password Password to authenticate with the MySQL endpoint
tasks.max Maximum number of tasks to be launched for this connector
poll.interval.ms Time interval in milliseconds between subsequent polls for each table to pull new data
topic.prefix Custom prefix value to append with each table name when creating topics in the MSK cluster
mode The operation mode for each poll, such as bulk, timestamp, incrementing, or timestamp+incrementing
connection.attempts Maximum number of retries for JDBC connection
security.protocol Sets up TLS for encryption
sasl.mechanism Identifies the SASL mechanism to use
ssl.truststore.location Location for storing trusted certificates
ssl.keystore.location Location for storing private keys
sasl.client.callback.handler.class Encapsulates constructing a SigV4 signature based on extracted credentials
sasl.jaas.config Binds the SASL client implementation

  1. In the Connector capacity section, select Autoscaled for Capacity type and keep the default value of 1 for MCU count per worker.
  2. Set 4 for Maximum number of workers and keep all other default values for Workers and Autoscaling utilization thresholds.
  3. For Worker configuration, select Use the MSK default configuration.
  4. Under Access permissions, choose the custom IAM role msk-connect-rds-jdbc-MSKConnectServiceIAMRole-* created earlier.
  5. For Log delivery, select Deliver to Amazon CloudWatch Logs.
  6. For Log group, choose the log group msk-jdbc-source-connector created earlier.
  7. Choose Next.
  8. Under Review and Create, validate all the settings and choose Create connector.

After the connector has transitioned to RUNNING status, the data should start flowing from the RDS instance to the MSK cluster.

Validate the data

To validate and compare the data, complete the following steps:

  1. SSH into the EC2 client instance MSKEC2Client using the following command from your local terminal:
    ssh -i <keypair> <user>@<hostname>

  2. To connect to the MSK cluster with IAM authentication, enter the latest version of the aws-msk-iam-auth JAR file in the class path:
    $ export CLASSPATH=/home/ec2-user/aws-msk-iam-auth-1.1.0-all.jar

  3. On the Amazon MSK console, choose Clusters in the navigation pane and choose the cluster MSKConnect-msk-connect-rds-jdbc.
  4. On the Cluster summary page, choose View client information.
  5. In the View client information section, under Bootstrap servers, copy the private endpoint for Authentication type IAM.

  1. Set up additional environment variables for working with the latest version of Apache Kafka installation and connecting to Amazon MSK bootstrap servers, where <bootstrap servers> is the list of bootstrap servers that allow connecting to the MSK cluster with IAM authentication:
    $ export PATH=~/kafka/bin:$PATH
    
    $ cp ~/aws-msk-iam-auth-1.1.0-all.jar ~/kafka/libs/.
    
    $ export BOOTSTRAP_SERVERS=<bootstrap servers>

  2. Set up a config file named client/properties to be used for authentication:
    $ cd /home/ec2-user/kafka/config/
    
    $ vi client.properties
    
    # Sets up TLS for encryption and SASL for authN.
    
    security.protocol = SASL_SSL
    
    # Identifies the SASL mechanism to use.
    
    sasl.mechanism = AWS_MSK_IAM
    
    # Binds SASL client implementation.
    
    sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
    
    # Encapsulates constructing a SigV4 signature based on extracted credentials.
    
    # The SASL client bound by "sasl.jaas.config" invokes this class.
    
    sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler

  3. Validate the list of topics created in the MSK cluster:
    $ cd /home/ec2-user/kafka/
    
    $ bin/kafka-topics.sh --list --bootstrap-server $BOOTSTRAP_SERVERS --command-config /home/ec2-user/kafka/config/client.properties

  1. Validate that data has been loaded to the topics in the MSK cluster:
    $ bin/kafka-console-consumer.sh --topic rds-to-msk-seat --from-beginning --bootstrap-server $BOOTSTRAP_SERVERS --consumer.config /home/ec2-user/kafka/config/client.properties

Synchronize data using a query to Amazon RDS and write to Amazon MSK

To synchronize the results of a query that flattens data by joining multiple tables in Amazon RDS for MySQL, create an Amazon MSK Connect managed connector with the following steps:

  1. On Amazon MSK console, choose Connectors in the navigation pane under MSK Connect.
  2. Choose Create connector.
  3. Select Use existing custom plugin and under Custom plugins, select the pluginmsk-confluent-jdbc-plugin-v1.
  4. For Connector name, enter msk-jdbc-connector-rds-to-msk-query.
  5. Enter an optional description.
  6. For Cluster type, select MSK cluster.
  7. For MSK clusters, select the cluster you created earlier.
  8. For Authentication, choose IAM.
  9. Under Connector configurations, enter the following settings:
    ### CONNECTOR SPECIFIC SETTINGS
    
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    
    connection.user=admin
    
    connection.url=jdbc:mysql://<rds_instance_endpoint_name>:3306/dms_sample
    
    connection.password=XXXXX
    
    tasks.max=1
    
    poll.interval.ms=300000
    
    topic.prefix=rds-to-msk-query-topic
    
    mode=bulk
    
    connection.attempts=1
    
    query=select last_name, name as team_name, sport_type_name, sport_league_short_name, sport_division_short_name from dms_sample.sport_team join dms_sample.player on player.sport_team_id = sport_team.id;
    
    ### CONVERTING KAFKA MESSAGE BYTES TO JSON
    
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    key.converter=org.apache.kafka.connect.storage.StringConverter
    
    value.converter.schemas.enable=false
    
    key.converter.schemas.enable=false
    
    ###GENERIC AUTHENTICATION SETTINGS FOR KAFKA CONNECT
    
    security.protocol=SASL_SSL
    
    sasl.mechanism=AWS_MSK_IAM
    
    ssl.truststore.location=~/kafka.truststore.jks
    
    ssl.keystore.location=~/kafka.client.keystore.jks
    
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

  10. In the Connector capacity section, select Autoscaled for Capacity type and keep the default value of 1 for MCU count per worker.
  11. Set 4 for Maximum number of workers and keep all other default values for Workers and Autoscaling utilization thresholds.
  12. For Worker configuration, select Use the MSK default configuration.
  13. Under Access permissions, choose the custom IAM role role_msk_connect_serivce_exec_custom.
  14. For Log delivery, select Deliver to Amazon CloudWatch Logs.
  15. For Log group, choose the log group created earlier.
  16. Choose Next.
  17. Under Review and Create, validate all the settings and choose Create connector.

Once the connector has transitioned to RUNNING status, the data should start flowing from the RDS instance to the MSK cluster.

  1. For data validation, SSH into the EC2 client instance MSKEC2Client and run the following command to see the data in the topic:
    $ bin/kafka-console-consumer.sh --topic rds-to-msk-query-topic --from-beginning --bootstrap-server $BOOTSTRAP_SERVERS --consumer.config /home/ec2-user/kafka/config/client.properties

Clean up

To clean up your resources and avoid ongoing charges, complete the following the steps:

  1. On the Amazon MSK console, choose Connectors in the navigation pane under MSK Connect.
  2. Select the connectors you created and choose Delete.
  3. On the Amazon S3 console, choose Buckets in the navigation pane.
  4. Search for the bucket with the naming convention bkt-msk-connect-plugins-<aws_account_id>.
  5. Delete all the folders and objects in this bucket.
  6. Delete the bucket after all contents have been removed.
  7. To delete all other resources created using the CloudFormation stack, delete the stack via the AWS CloudFormation console.

Conclusion

Amazon MSK Connect is a fully managed service that provisions the required resources, monitors the health and delivery state of connectors, maintains the underlying hardware, and auto scales connectors to balance the workloads. In this post, we saw how to set up the open-source JDBC connector from Confluent to stream data between an RDS for MySQL instance and an MSK cluster. We also explored different options to synchronize all the tables as well as use the query-based approach to stream denormalized data into the MSK topics.

For more information about Amazon MSK Connect, see Getting started using MSK Connect.


About the Authors

Manish Virwani is a Sr. Solutions Architect at AWS. He has more than a decade of experience designing and implementing large-scale big data and analytics solutions. He provides technical guidance, design advice, and thought leadership to some of the key AWS customers and partners.

Indira Balakrishnan is a Principal Solutions Architect in the AWS Analytics Specialist SA Team. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems using data-driven decisions. Outside of work, she volunteers at her kids’ activities and spends time with her family.

Peloton embraces Amazon Redshift to unlock the power of data during changing times

Post Syndicated from Phil Goldstein original https://aws.amazon.com/blogs/big-data/peloton-embraces-amazon-redshift-to-unlock-the-power-of-data-during-changing-times/

Jerry Wang, Peloton’s Director of Data Engineering (left), and Evy Kho, Peloton's Manager of Subscription Analytics, discuss how the company has benefited from using Amazon Redshift.

Credit: Phil Goldstein
Jerry Wang, Peloton’s Director of Data Engineering (left), and Evy Kho, Peloton’s Manager of Subscription Analytics, discuss how the company has benefited from using Amazon Redshift.

New York-based Peloton, which aims to help people around the world reach their fitness goals through its connected fitness equipment and subscription-based classes, saw booming growth in the early stage of the COVID-19 pandemic. In 2020, as gyms shuttered and people looked for ways to stay active from the safety of their homes, the company’s annual revenue soared from $915 million in 2019 to $4 billion in 2021. Meanwhile, the company’s subscribers jumped from around 360,000 in 2019 to 2.76 million at the end of 2022.

As Peloton’s business continued to evolve amid a changing macroeconomic environment, it was essential that it could make smart business decisions quickly, and one of the best ways to do that was to harness insights from the huge amount of data that it had been gathering over recent years.

During that same time, AWS has been focused on helping customers manage their ever-growing volumes of data with tools like Amazon Redshift, the first fully managed, petabyte-scale cloud data warehouse. The service has grown into a multifaceted service used by tens of thousands of customers to process exabytes of data on a daily basis (1 exabyte is equivalent to 119 billion song downloads). With Amazon Redshift, you get access to a modern data architecture that helps you break down internal data silos, share data securely and seamlessly, and support multiple users who don’t have specialized data and analytics skills.

When Jerry Wang, Peloton’s director of data engineering, joined the company in 2019, he needed to make sure the service would handle the company’s massive and growing amounts of data. He also needed to ensure Amazon Redshift could help the company efficiently manage the wide variety of data and the users who would need to access it, and deliver insights on that data at high velocity—all while being cost-effective and secure.

Wang was delighted to see that as Peloton experienced its massive growth and change, AWS continued to release new Amazon Redshift features and associated capabilities that would perfectly suit his company’s needs at just the right time.

“Over the years, I’ve always been in the stage where I hope Redshift can have a new, specific feature,” Wang says, “and then, in a very limited amount of time, AWS releases that kind of feature.”

Peloton’s data volumes soar as the business grows

Man working out with a weight while viewing a Peloton class on his TV in a living room.

Credit: Peloton

As Peloton’s business has evolved, the amount of data it is generating and analyzing has grown exponentially. From 2019 to now, Wang reports the amount of data the company holds has grown by a factor of 20. In fact, a full 95% of the total historical data the company has generated has come in the last 4 years. This growth has been driven both by surges in the number of users on Peloton’s platform and the variety of data the company is collecting.

Peloton collects reams of data on its sales of internet-connected exercise equipment like stationary bikes and treadmills. The company also collects data on customers’ workouts, which it then provides back to them in various reports such as a monthly summary, giving them insights into how often they worked out, their best output, trends in their workouts, the instructor they used the most, how many calories they burned, and more. All of this data helps Peloton make strategic business decisions, refine its operations to become more efficient, adjust its programming, and drive subscriber engagement and growth.

In 2019 and into 2020, as Peloton’s business boomed, the company needed an analytics system that could help it manage an explosion of data, both from users and related to its business. The company embraced Amazon Redshift because of the service’s versatility, ease of use, price-performance at scale, continuous pace of innovation, and ability to handle concurrent queries from dozens of internal data teams.

Wang said that when he joined the company, there were two kinds of users who were performing daily data operations in Peloton’s Amazon Redshift data warehouse. One group performed extract, transform, and load (ETL) operations to take raw data and make it available for analysis. The other was a group of business users who, each morning, would perform queries to generate local data visualizations, creating a surge of capacity on the Amazon Redshift data warehouse. “So, when these two loads ran together, the performance suffered directly,” Wang says.

One of the features Peloton adopted was Amazon Redshift Concurrency Scaling, which provides consistent and fast query performance even across thousands of concurrent users and concurrent queries. This helped solve the problem by automatically adding query processing power in seconds and processing queries without delays. When the workload demand subsided, the extra processing power was automatically removed, so Peloton only had to pay for the time when Concurrency Scaling data warehouses were in use. Wang says Peloton was running about 10 hours of Concurrency Scaling on a consistent daily basis to deal with the congestion, which, he says, “solved my problem at that moment.”

In 2020, as the pandemic inspired hoards to hop on bikes in their living rooms, Wang also upgraded Amazon Redshift with the newly introduced Amazon Redshift RA3 instances with managed storage (RMS). These represented a new generation of compute instances with managed, analytics-optimized storage designed for high-transaction, fast query performance and lower costs.

“The new instance … was a great feature for us,” Wang says. “It solved our concern about moving from terabyte scale to petabyte scale.”

Peloton’s business is driven by a variety of data for a wide range of users

Man watching a female Peloton biking instructor through a touch screen display on his Peloton bike.

Credit: Peloton

Peloton’s business model is driven by a wide variety of large volumes of data. In addition to selling bikes, treadmills, and indoor rowing machines, and expanding its subscription platform to include non-equipment-based workouts, the company has dozens of instructors in five countries, and it licenses music from three major music licensors. In 2022, it began renting bikes as well as selling them. Internally, Peloton employees working in finance, accounting, marketing, supply chain operations, music and content, and more are using data to track subscriber growth, content engagement, and which sales channels are leading to the most net new subscriptions.

“There was a time when we were just a bike company, and now we’re so much more than that,” says Evy Kho, manager of subscription analytics at Peloton.

There is also a much wider range of sales channels for Peloton equipment than just a few years ago. In the past, Peloton customers could only purchase bikes through the Peloton website or secondhand. Now, customers can purchase hardware from third-party sites like Amazon. That introduced “a really interesting data problem” for Peloton, says Kho, as it strives to determine how to link subscription signups back to exercise equipment sales.

In the face of this variability, complexity, and need for instant access to data to inform business decision-makers, Peloton embraced Amazon Redshift Serverless as an early adopter after AWS introduced the feature in late 2021. Redshift Serverless allows companies to quickly run and scale analytics capacity without database managers and data engineers needing to manage data warehouse infrastructure.

Redshift Serverless also has the ability to quickly spin up analytics capacity for different users, or personas, within an organization. This allows different teams across Peloton to perform analytics on the same datasets at the same time to generate insights on their individual parts of the business. It’s “incredibly important in terms of assessing what’s been good for our business,” Kho says.

Wang also says Peloton is considering supporting specific personas for those who need analytics around financial information governed by securities regulations, and another for users who need to perform analytics on data governed by regulations around personally identifiable information (PII).

Wang points out that Redshift Serverless also allows him to spin up Amazon Redshift data warehouses to handle special usage patterns. For example, ETL loads are often high I/O but require low CPU resources, and are very predictable because Peloton controls the process. However, when internal users want to perform data analytics or machine learning, the company doesn’t have control over the demand for those queries, and the load on Amazon Redshift data warehouses can be variable, with some queries more CPU-intensive than others. Previously, any provisioned data warehouse would have a fixed cost, and it would have to be provisioned to cope with the highest possible workloads even if the utilization rates turned out to be low. Now, for these different scenarios, Wang creates different Amazon Redshift instances to handle that variability without those heavy, fixed costs.

As Peloton’s use of Amazon Redshift has evolved and matured, its costs have gone down, according to Wang. “If you look at Serverless, the amount … that we spend on the Serverless is actually much smaller than we did previously, compared to the Concurrency Scaling cost.”

In a serverless environment, there is no upfront cost to Peloton. “I can set it up as quickly as I can and we pay as we need it,” Wang says. “It scales up when the load goes up. So, it’s a perfect fit.”

Peloton uses Amazon Redshift to get to insights faster

Women running on a Peloton treadmill with a touch screen display

Credit: Peloton

Peloton’s focus on efficiency and sustainable growth has meant that it needs to act more quickly than ever to make sound, data-informed business decisions. Peloton, Wang notes, is long past the stage where all it cared about was growth. “We are a mature company now, so operational efficiency is very important; it’s key to the business,” he says.

When Peloton launches new products, for example, two things typically happen, Wang says. One is that there is a spike in data volumes, both in traffic to its website and the number of sales transactions it’s processing. The second is that the company’s management team will want real-time updates and analysis of how sales are performing.

Redshift Serverless and data sharing lets users quickly start performing real-time analytics and build reporting and dashboard applications without any additional engineering required. Wang confirms this benefit, especially in the example of a new product launch, saying it “will scale up by itself without me having to intervene. I don’t need to allocate a budget. I don’t need to change any configurations.”

In the past, when Peloton only offered its fitness equipment through its own website, it was easy to associate fulfillment data on orders with subscriptions. However, as those channels grew and became more complex, Peloton turned to the data sharing capabilities of Amazon Redshift to share data quickly and easily across teams. Peloton’s teams for subscriber analytics, supply chain, accounting, and more need fast access to fulfillment data to ensure they can track it accurately, respond if changes are needed, and determine how fulfillment data aligns with subscriptions and revenue.

“Getting them those results even faster has been incredibly helpful, and is only becoming more important as we have become far more data-driven than I think you could argue we were before,” Kho says.

Amazon Redshift marries data security, governance, and compliance with innovation

Like all customers, Peloton is concerned about data security, governance, and compliance. With security features like dynamic data masking, role-based access control, and row-level security, Amazon Redshift protects customers’ data with granular authorization features and comprehensive identity management.

Customers also are able to easily provide authorizations for the right users or groups. These features are available out of the box, within the standard pricing model.

Wang notes that Amazon Redshift’s security model is based on a traditional database model, which is a well-understood and robust model. “So for us, to provision access on that model is quite straightforward,” Wang says.

At every stage of Peloton’s evolution over the last 4 years, the company has been able to turn to AWS and Amazon Redshift to help it effectively manage that growth and complexity.

“When I started,” Wang says, “I said, OK, I need a temporary boost in capacity. Then came Concurrency Scaling. And then I said, I need cheaper storage, and [RA3] comes along. And then the ultimate challenge [was], I’m no longer satisfied with a monolithic Redshift instance. Serverless solved that issue.”

Join AWS Data Insights Day 2023

If you want to learn how your company can use Amazon Redshift to analyze large volumes of data in an easy-to-use, scalable, cost-effective, and secure way, don’t miss AWS Data Insights Day on May 24, 2023. During the day-long virtual event, learn from AWS leaders, experts, partners, and customers—including Peloton, Gilead, McDonald’s, Global Foundries, Schneider Electric, and Flutter Entertainment—how Amazon Redshift and features like Amazon Redshift ML are helping drive business innovation, optimization, and cost savings, especially in today’s uncertain economic times.

Register Now for a calendar reminder.

To learn more about Amazon Redshift, see Amazon Redshift and Amazon Redshift: Ten years of continuous reinvention.


About the author

Phil Goldstein is a copywriter and editor with AWS product marketing. He has 15 years of technology writing experience, and prior to joining AWS was a senior editor at a content marketing agency and a business journalist covering the wireless industry.

Amazon OpenSearch Service Under the Hood: Multi-AZ with standby

Post Syndicated from Rohin Bhargava original https://aws.amazon.com/blogs/big-data/amazon-opensearch-service-under-the-hood-multi-az-with-standby/

Amazon OpenSearch Service recently announced Multi-AZ with standby, a new deployment option for managed clusters that enables 99.99% availability and consistent performance for business-critical workloads. With Multi-AZ with standby, clusters are resilient to infrastructure failures like hardware or networking failure. This option provides improved reliability and the added benefit of simplifying cluster configuration and management by enforcing best practices and reducing complexity.

In this post, we share how Multi-AZ with standby works under the hood to achieve high resiliency and consistent performance to meet the four 9s.

Background

One of the principles in designing highly available systems is that they need to be ready for impairments before they happen. OpenSearch is a distributed system, which runs on a cluster of instances that have different roles. In OpenSearch Service, you can deploy data nodes to store your data and respond to indexing and search requests, you can also deploy dedicated cluster manager nodes to manage and orchestrate the cluster. To provide high availability, one common approach for the cloud is to deploy infrastructure across multiple AWS Availability Zones. Even in the rare case that a full zone becomes unavailable, the available zones continue to serve traffic with replicas.

When you use OpenSearch Service, you create indexes to hold your data and specify partitioning and replication for those indexes. Each index is comprised of a set of primary shards and zero to many replicas of those shards. When you additionally use the Multi-AZ feature, OpenSearch Service ensures that primary shards and replica shards are distributed so that they’re in different Availability Zones.

When there is an impairment in an Availability Zone, the service would scale up in other Availability Zones and redistribute shards to spread out the load evenly. This approach was reactive at best. Additionally, shard redistribution during failure events causes increased resource utilization, leading to increased latencies and overloaded nodes, further impacting availability and effectively defeating the purpose of fault-tolerant, multi-AZ clusters. A more effective, statically stable cluster configuration requires provisioning infrastructure to the point where it can continue operating correctly without having to launch any new capacity or redistribute any shards even if an Availability Zone becomes impaired.

Designing for high availability

OpenSearch Service manages tens of thousands of OpenSearch clusters. We’ve gained insights into which cluster configurations like hardware (data or cluster-manager instance types) or storage (EBS volume types), shard sizes, and so on are more resilient to failures and can meet the demands of common customer workloads. Some of these configurations have been included in Multi-AZ with standby to simplify configuring the clusters. However, this alone is not enough. A key ingredient in achieving high availability is maintaining data redundancy.

When you configure a single replica (two copies) for your indexes, the cluster can tolerate the loss of one shard (primary or replica) and still recover by copying the remaining shard. A two-replica (three copies) configuration can tolerate failure of two copies. In the case of a single replica with two copies, you can still sustain data loss. For example, you could lose data if there is a catastrophic failure in one Availability Zone for a prolonged duration, and at the same time, a node in a second zone fails. To ensure data redundancy at all times, the cluster enforces a minimum of two replicas (three copies) across all its indexes. The following diagram illustrates this architecture.

The Multi-AZ with standby feature deploys infrastructure in three Availability Zones, while keeping two zones as active and one zone as standby. The standby zone offers consistent performance even during zonal failures by ensuring same capacity at all times and by using a statically stable design without any capacity provisioning or data movements during failure. During normal operations, the active zone serves coordinator traffic for read and write requests and shard query traffic, and only replication traffic goes to the standby zone. OpenSearch uses synchronous replication protocol for write requests, which by design has zero replication lag, enabling the service to instantaneously promote a standby zone to active in the event of any failure in an active zone. This event is referred to as a zonal failover. The previously active zone is demoted to the standby mode and recovery operations to bring the state back to healthy begin.

Why zonal failover is critical but hard to do right

One or more nodes in an Availability Zone can fail due to a wide variety of reasons, like hardware failures, infrastructure failures like fiber cuts, power or thermal issues, or inter-zone or intra-zone networking problems. Read requests can be served by any of the active zones, whereas write requests need to be synchronously replicated to all copies across multiple Availability Zones. OpenSearch Service orchestrates two modes of failovers: read failovers and the write failovers.

The primarily goals of read failovers are high availability and consistent performance. This requires the system to constantly monitor for faults and shift traffic away from the unhealthy nodes in the impacted zone. The system takes care of handling the failovers gracefully, allowing all in-flight requests to finish while simultaneously shifting new incoming traffic to a healthy zone. However, it’s also possible for multiple shard copies across both active zones to be unavailable in cases of two node failures or one zone plus one node failure (often referred to as double faults), which poses a risk to availability. To solve this challenge, the system uses a fail-open mechanism to serve traffic off the third zone while it may still be in a standby mode to ensure the system remains highly available. The following diagram illustrates this architecture.

An impaired network device impacting inter-zone communication can cause write requests to significantly slow down, owing to the synchronous nature of replication. In such an event, the system orchestrates a write failover to isolate the impaired zone, cutting off all ingress and egress traffic. Although with write failovers the recovery is immediate, it results in all nodes along with its shards being taken offline. However, after the impacted zone is brought back after network recovery, shard recovery should still be able to use unchanged data from its local disk, avoiding full segment copy. Because the write failover results in the shard copy to be unavailable, we exercise write failovers with extreme caution, neither too frequently nor during transient failures.

The following graph depicts that during a zonal failure, automatic read failover prevents any impact to availability.

The following depicts that during a networking slowdown in a zone, write failover helps recover availability.

To ensure that the zonal failover mechanism is predictable (able to seamlessly shift traffic during an actual failure event), we regularly exercise failovers and keep rotating active and standby zones even during steady state. This not only verifies all network paths, ensuring we don’t hit surprises like clock skews, stale credentials, or networking issues during failover, but it also keeps gradually shifting caches to avoid cold starts on failovers, ensuring we deliver consistent performance at all times.

Improving the resiliency of the service

OpenSearch Service uses several principles and best practices to increase reliability, like automatic detection and faster recovery from failure, throttling excess requests, fail fast strategies, limiting queue sizes, quickly adapting to meet workload demands, implementing loosely coupled dependencies, continuously testing for failures, and more. We discuss a few of these methods in this section.

Automatic failure detection and recovery

All faults get monitored at a minutely granularity, across multiple sub-minutely metrics data points. Once detected, the system automatically triggers a recovery action on the impacted node. Although most classes of failures discussed so far in this post refer to binary failures where the failure is definitive, there is another kind of failure: non-binary failures, termed gray failures, whose manifestations are subtle and usually defy quick detection. Slow disk I/O is one example, which causes performance to be adversely impacted. The monitoring system detects anomalies in I/O wait times, latencies, and throughput, to detect and replace a node with slow I/O. Faster and effective detection and quick recovery is our best bet for a wide variety of infrastructure failures beyond our control.

Effective workload management in a dynamic environment

We’ve studied workload patterns that cause the system either to be overloaded with too many requests, maxing out CPU/memory, or a few rogue queries that can that either allocate huge chunks of memory or runaway queries that can exhaust multiple cores, either degrading the latencies of other critical requests or causing multiple nodes to fail due to the system’s resources running low. Some of the improvements in this direction are being done as a part of search backpressure initiatives, starting with tracking the request footprint at various checkpoints that prevents accommodating more requests and cancels the ones already running if they breach the resource limits for a sustained duration. To supplement backpressure in traffic shaping, we use admission control, which provides capabilities to reject a request at the entry point to avoid doing non-productive work (requests either time out or get cancelled) when the system is already run high on CPU and memory. Most of the workload management mechanisms have configurable knobs. No one size fits all workloads, therefore we use Auto-Tune to control them more granularly.

The cluster manager performs critical coordination tasks like metadata management and cluster formation, and orchestrates a few background operations like snapshot and shard placement. We added a task throttler to control the rate of dynamic mapping updates, snapshot tasks, and so on to prevent overwhelming it and to let critical operations run deterministically all the time. But what happens when there is no cluster manager in the cluster? The next section covers how we solved this.

Decoupling critical dependencies

In the event of cluster manager failure, searches continue as usual, but all write requests start to fail. We concluded that allowing writes in this state should still be safe as long as it doesn’t need to update the cluster metadata. This change further improves the write availability without compromising data consistency. Other service dependencies were evaluated to ensure downstream dependencies can scale as the cluster grows.

Failure mode testing

Although it’s hard to mimic all kinds of failures, we rely on AWS Fault Injection Simulator (AWS FIS) to inject common faults in the system like node failures, disk impairment, or network impairment. Testing with AWS FIS regularly in our pipelines helps us improve our detection, monitoring, and recovery times.

Contributing to open source

OpenSearch is an open-source, community-driven software. Most of the changes including the high availability design to support active and standby zones have been contributed to open source; in fact, we follow an open-source first development model. The fundamental primitive that enables zonal traffic shift and failover is based on a weighted traffic routing policy (active zones are assigned weights as 1 and standby zones are assigned weights as 0). Write failovers use the zonal decommission action, which evacuates all traffic from a given zone. Resiliency improvements for search backpressure and cluster manager task throttling are some of the ongoing efforts. If you’re excited to contribute to OpenSearch, open up a GitHub issue and let us know your thoughts.

Summary

Efforts to improve reliability is a never-ending cycle as we continue to learn and improve. With the Multi-AZ with standby feature, OpenSearch Service has integrated best practices for cluster configuration, improved workload management, and achieved four 9s of availability and consistent performance. OpenSearch Service also raised the bar by continuously verifying availability with zonal traffic rotations and automated tests via AWS FIS.

We are excited to continue our efforts into improving the reliability and fault tolerance even further and to see what new and existing solutions builders can create using OpenSearch Service. We hope this leads to a deeper understanding of the right level of availability based on the needs of your business and how this offering achieves the availability SLA. We would love to hear from you, especially about your success stories achieving high levels of availability on AWS. If you have other questions, please leave a comment.


About the authors

Bukhtawar Khan is a Principal Engineer working on Amazon OpenSearch Service. He is interested in building distributed and autonomous systems. He is a maintainer and an active contributor to OpenSearch.

Gaurav Bafna is a Senior Software Engineer working on OpenSearch at Amazon Web Services. He is fascinated about solving problems in distributed systems. He is a maintainer and an active contributor to OpenSearch.

Murali Krishna is a Senior Principal Engineer at AWS OpenSearch Service. He has built AWS OpenSearch Service and AWS CloudSearch. His areas of expertise include Information Retrieval, Large scale distributed computing, low latency real time serving systems etc. He has vast experience in designing and building web scale systems for crawling, processing, indexing and serving text and multimedia content. Prior to Amazon, he was part of Yahoo!, building crawling and indexing systems for their search products.

Ranjith Ramachandra is a Senior Engineering Manager working on Amazon OpenSearch Service. He is passionate about highly scalable distributed systems, high performance and resilient systems.

Rohin Bhargava is a Sr. Product Manager with the Amazon OpenSearch Service team. His passion at AWS is to help customers find the correct mix of AWS services to achieve success for their business goals.

Scale your AWS Glue for Apache Spark jobs with new larger worker types G.4X and G.8X

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/scale-your-aws-glue-for-apache-spark-jobs-with-new-larger-worker-types-g-4x-and-g-8x/

Hundreds of thousands of customers use AWS Glue, a serverless data integration service, to discover, prepare, and combine data for analytics, machine learning (ML), and application development. AWS Glue for Apache Spark jobs work with your code and configuration of the number of data processing units (DPU). Each DPU provides 4 vCPU, 16 GB memory, and 64 GB disk. AWS Glue manages running Spark and adjusts workers to achieve the best price performance. For workloads such as data transforms, joins, and queries, you can use G.1X (1 DPU) and G.2X (2 DPU) workers, which offer a scalable and cost-effective way to run most jobs. With exponentially growing data sources and data lakes, customers want to run more data integration workloads, including their most demanding transforms, aggregations, joins, and queries. These workloads require higher compute, memory, and storage per worker.

Today we are pleased to announce the general availability of AWS Glue G.4X (4 DPU) and G.8X (8 DPU) workers, the next series of AWS Glue workers for the most demanding data integration workloads. G.4X and G.8X workers offer increased compute, memory, and storage, making it possible for you to vertically scale and run intensive data integration jobs, such as memory-intensive data transforms, skewed aggregations, and entity detection checks involving petabytes of data. Larger worker types not only benefit the Spark executors, but also in cases where the Spark driver needs larger capacity—for instance, because the job query plan is quite large.

This post demonstrates how AWS Glue G.4X and G.8X workers help you scale your AWS Glue for Apache Spark jobs.

G.4X and G.8X workers

AWS Glue G.4X and G.8X workers give you more compute, memory, and storage to run your most demanding jobs. G.4X workers provide 4 DPU, with 16 vCPU, 64 GB memory, and 256 GB of disk per node. G.8X workers provide 8 DPU, with 32 vCPU, 128 GB memory, and 512 GB of disk per node. You can enable G.4X and G.8X workers with a single parameter change in the API, AWS Command Line Interface (AWS CLI), or visually in AWS Glue Studio. Regardless of the worker used, all AWS Glue jobs have the same capabilities, including auto scaling and interactive job authoring via notebooks. G.4X and G.8X workers are available with AWS Glue 3.0 and 4.0.

The following table shows compute, memory, disk, and Spark configurations per worker type in AWS Glue 3.0 or later.

AWS Glue Worker Type DPU per Node vCPU Memory (GB) Disk (GB) Number of Spark Executors per Node Number of Cores per Spark Executor
G.1X 1 4 16 64 1 4
G.2X 2 8 32 128 1 8
G.4X (new) 4 16 64 256 1 16
G.8X (new) 8 32 128 512 1 32

To use G.4X and G.8X workers on an AWS Glue job, change the setting of the worker type parameter to G.4X or G.8X. In AWS Glue Studio, you can choose G 4X or G 8X under Worker type.

In the AWS API or AWS SDK, you can specify G.4X or G.8X in the WorkerType parameter. In the AWS CLI, you can use the --worker-type parameter in a create-job command.

To use G.4X and G.8X on an AWS Glue Studio notebook or interactive sessions, set G.4X or G.8X in the %worker_type magic:

Performance characteristics using the TPC-DS benchmark

In this section, we use the TPC-DS benchmark to showcase performance characteristics of the new G.4X and G.8X worker types. We used AWS Glue version 4.0 jobs.

G.2X, G.4X, and G.8X results with the same number of workers

Compared to the G.2X worker type, the G.4X worker has 2 times the DPUs and the G.8X worker has 4 times the DPUs. We ran over 100 TPC-DS queries against the 3 TB TPC-DS dataset with the same number of workers but on different worker types. The following table shows the results of the benchmark.

Worker Type Number of Workers Number of DPUs Duration (minutes) Cost at $0.44/DPU-hour ($)
G.2X 30 60 537.4 $236.46
G.4X 30 120 264.6 $232.85
G.8X 30 240 122.6 $215.78

When running jobs on the same number of workers, the new G.4X and G.8x workers achieved roughly linear vertical scalability.

G.2X, G.4X, and G.8X results with the same number of DPUs

We ran over 100 TPC-DS queries against the 10 TB TPC-DS dataset with the same number of DPUs but on different worker types. The following table shows the results of the experiments.

Worker Type Number of Workers Number of DPUs Duration (minutes) Cost at $0.44/DPU-hour ($)
G.2X 40 80 1323 $776.16
G.4X 20 80 1191 $698.72
G.8X 10 80 1190 $698.13

When running jobs on the same number of total DPUs, the job performance stayed mostly the same with new worker types.

Example: Memory-intensive transformations

Data transformations are an essential step to preprocess and structure your data into an optimal form. Bigger memory footprints are consumed in some transformations such as aggregation, join, your own custom logic using user-defined functions (UDFs), and so on. The new G.4X and G.8X workers enable you to run larger memory-intensive transformations at scale.

The following example reads large JSON files compressed in GZIP from an input Amazon Simple Storage Service (Amazon S3) location, performs groupBy, calculates groups based on K-means clustering using a Pandas UDF, then shows the results. Note that this UDF-based K-means is used just for illustration purposes; it’s recommended to use native K-means clustering for production purposes.

With G.2X workers

When an AWS Glue job runs on 12 G.2X workers (24 DPU), it failed due to a No space left on device error. On the Spark UI, the Stages tab for the failed stage shows that there were multiple failed tasks in the AWS Glue job due to the error.

The Executor tab shows failed tasks per executor.

Generally, G.2X workers can process memory-intensive workload well. This time, we used a special Pandas UDF that consumes a significant amount of memory, and it caused a failure due to a large amount of shuffle writes.

With G.8X workers

When an AWS Glue job runs on 3 G.8X workers (24 DPU), it succeeded without any failures, as shown on the Spark UI’s Jobs tab.

The Executors tab also explains that there were no failed tasks.

From this result, we observed that G.8X workers processed the same workload without failures.

Conclusion

In this post, we demonstrated how AWS Glue G.4X and G.8X workers can help you vertically scale your AWS Glue for Apache Spark jobs. G.4X and G.8X workers are available today in US East (Ohio), US East (N. Virginia), US West (Oregon), Asia Pacific (Singapore), Asia Pacific (Sydney), Asia Pacific (Tokyo), Canada (Central), Europe (Frankfurt), Europe (Ireland), and Europe (Stockholm). You can start using the new G.4X and G.8X worker types to scale your workload from today. To get started with AWS Glue, visit AWS Glue.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Tomohiro Tanaka is a Senior Cloud Support Engineer on the AWS Support team. He’s passionate about helping customers build data lakes using ETL workloads. In his free time, he enjoys coffee breaks with his colleagues and making coffee at home.

Chuhan LiuChuhan Liu is a Software Development Engineer on the AWS Glue team. He is passionate about building scalable distributed systems for big data processing, analytics, and management. In his spare time, he enjoys playing tennis.

Matt Su is a Senior Product Manager on the AWS Glue team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytic services. In his spare time, he enjoys skiing and gardening.

New scatter plot options in Amazon QuickSight to visualize your data

Post Syndicated from Bhupinder Chadha original https://aws.amazon.com/blogs/big-data/new-scatter-plot-options-in-amazon-quicksight-to-visualize-your-data/

Are you looking to understand the relationships between two numerical variables? Scatter plots are a powerful visual type that allow you to identify patterns, outliers, and strength of relationships between variables. In this post, we walk you through the newly launched scatter plot features in Amazon QuickSight, which will help you take your correlation analysis to the next level.

Feature overview

The scatter plot is undoubtedly one of the most effective visualizations for correlation analysis, helping to identify patterns, outliers, and the strength of the relationship between two or three variables (using a bubble chart). We have improved the performance and versatility of our scatter plots, supporting five additional use cases. The following functionalities have been added in this release:

  • Display unaggregated values – Previously, when there was no field placed on Color, QuickSight displayed unaggregated values, and when a field was placed on Color, the metrics would be aggregated and grouped by that dimension. Now, you can choose to plot unaggregated values even if you’re using a field on Color by using the new aggregate option called None from the field menu, in addition to aggregation options like Sum, Min, and Max. If one value is set to be aggregated, the other value will be automatically set as aggregated, and the same applies to unaggregated scenarios. Mixed aggregation scenarios are not supported, meaning that one value can’t be set as aggregated while the other is unaggregated. It’s worth noting that the unaggregated scenario (the None option) is only supported for numerical values, whereas categorical values (like dates and dimensions) will only display aggregate values such as Count and Count distinct.
  • Support for an additional Label field – We’re introducing a new field well called Label alongside the existing Color field. This will allow you to color by one field and label by another, providing more flexibility in data visualization.
  • Faster load time – The load time is up to six times faster, which impacts both new and existing use cases. Upon launch, you’ll notice that scatter plots render noticeably faster, especially when dealing with larger datasets.

Explore advanced scatter plot use cases

You can choose to set both X and Y values to either aggregated or unaggregated (the None option) from the X and Y axis field menus. This will define if values will be aggregated by dimensions in the Color and Label field wells or not. To get started, add the required fields and choose the appropriate aggregation based on your use case.

Unaggregated use cases

The following screenshot shows an example of unaggregated X and Y value with Color.

The following screenshot shows an example of unaggregated X and Y with Label.

The following screenshot shows an example of unaggregated X and Y with Color and Label.

Aggregated use cases

The following screenshot shows an example of X and Y aggregated by Color.

The following screenshot shows an example of X and Y aggregated by Label.

The following screenshot shows an example of X and Y aggregated by Color and Label.

Conclusion

In summary, our enhanced scatter plots offer users greater performance and versatility, catering to a wider range of use cases than before. The ability to display unaggregated values and support for additional label fields gives users the flexibility they need to visualize the data they want. For further details, refer to Amazon QuickSight Scatterplot. Try out the new scatter plot updates and let us know your feedback in the comments section.


About the authors

Bhupinder Chadha is a senior product manager for Amazon QuickSight focused on visualization and front end experiences. He is passionate about BI, data visualization and low-code/no-code experiences. Prior to QuickSight he was the lead product manager for Inforiver, responsible for building a enterprise BI product from ground up. Bhupinder started his career in presales, followed by a small gig in consulting and then PM for xViz, an add on visualization product.

Single sign-on with Amazon Redshift Serverless with Okta using Amazon Redshift Query Editor v2 and third-party SQL clients

Post Syndicated from Maneesh Sharma original https://aws.amazon.com/blogs/big-data/single-sign-on-with-amazon-redshift-serverless-with-okta-using-amazon-redshift-query-editor-v2-and-third-party-sql-clients/

Amazon Redshift Serverless makes it easy to run and scale analytics in seconds without the need to set up and manage data warehouse clusters. With Redshift Serverless, users such as data analysts, developers, business professionals, and data scientists can get insights from data by simply loading and querying data in the data warehouse.

Customers use their preferred SQL clients to analyze their data in Redshift Serverless. They want to use an identity provider (IdP) or single sign-on (SSO) credentials to connect to Redshift Serverless to reuse existing using credentials and avoid additional user setup and configuration. When you use AWS Identity and Access Management (IAM) or IdP-based credentials to connect to a serverless data warehouse, Amazon Redshift automatically creates a database user for the end-user. You can simplify managing user privileges by using role-based access control. Admins can use a database-role mapping for SSO with the IAM roles that users are assigned to get their database privileges automatically. With this integration, organizations can simplify user management because they no longer need to create users and map them to database roles manually. You can define the mapped database roles as a principal tag for the IdP groups or IAM role, so Amazon Redshift database roles and users who are members of those IdP groups are granted to the database roles automatically.

In this post, we focus on Okta as the IdP and provide step-by-step guidance to integrate Redshift Serverless with Okta using the Amazon Redshift Query Editor V2 and with SQL clients like SQL Workbench/J. You can use this mechanism with other IdP providers such as Azure Active Directory or Ping with any applications or tools using Amazon’s JDBC/ODBC/Python driver.

Solution overview

The following diagram illustrates the authentication flow of Okta with Redshift Serverless using federated IAM roles and automatic database-role mapping.

The workflow contains the following steps:

  1. Either the user chooses an IdP app in their browser, or the SQL client initiates a user authentication request to the IdP (Okta).
  2. Upon a successful authentication, Okta submits a request to the AWS federation endpoint with a SAML assertion containing the PrincipalTags.
  3. The AWS federation endpoint validates the SAML assertion and invokes the AWS Security Token Service (AWS STS) API AssumeRoleWithSAML. The SAML assertion contains the IdP user and group information that is stored in the RedshiftDbUser and RedshiftDbRoles principal tags, respectively. Temporary IAM credentials are returned to the SQL client or, if using the Query Editor v2, the user’s browser is redirected to the Query Editor v2 console using the temporary IAM credentials.
  4. The temporary IAM credentials are used by the SQL client or Query Editor v2 to call the Redshift Serverless GetCredentials API. The API uses the principal tags to determine the user and database roles that the user belongs to. An associated database user is created if the user is signing in for the first time and is granted the matching database roles automatically. A temporary password is returned to the SQL client.
  5. Using the database user and temporary password, the SQL client or Query Editor v2 connects to Redshift Serverless. Upon login, the user is authorized based on the Amazon Redshift database roles that were assigned in Step 4.

To set up the solution, we complete the following steps:

  1. Set up your Okta application:
    • Create Okta users.
    • Create groups and assign groups to users.
    • Create the Okta SAML application.
    • Collect Okta information.
  2. Set up AWS configuration:
    • Create the IAM IdP.
    • Create the IAM role and policy.
  3. Configure Redshift Serverless role-based access.
  4. Federate to Redshift Serverless using the Query Editor V2.
  5. Configure the SQL client (for this post, we use SQL Workbench/J).
  6. Optionally, implement MFA with SQL Client and Query Editor V2.

Prerequisites

You need the following prerequisites to set up this solution:

Set up Okta application

In this section, we provide the steps to configure your Okta application.

Create Okta users

To create your Okta users, complete the following steps:

  1. Sign in to your Okta organization as a user with administrative privileges.
  2. On the admin console, under Directory in the navigation pane, choose People.
  3. Choose Add person.
  4. For First Name, enter the user’s first name.
  5. For Last Name, enter the user’s last name.
  6. For Username, enter the user’s user name in email format.
  7. Select I will set password and enter a password.
  8. Optionally, deselect User must change password on first login if you don’t want the user to change their password when they first sign in. Choose Save.

Create groups and assign groups to users

To create your groups and assign them to users, complete the following steps:

  1. Sign in to your Okta organization as a user with administrative privileges.
  2. On the admin console, under Directory in the navigation pane, choose Groups.
  3. Choose Add group.
  4. Enter a group name and choose Save.
  5. Choose the recently created group and then choose Assign people.
  6. Choose the plus sign and then choose Done.
  7. Repeat Steps 1–6 to add more groups.

In this post, we create two groups: sales and finance.

Create an Okta SAML application

To create your Okta SAML application, complete the following steps:

  1. Sign in to your Okta organization as a user with administrative privileges.
  2. On the admin console, under Applications in the navigation pane, choose Applications.
  3. Choose Create App Integration.
  4. Select SAML 2.0 as the sign-in method and choose Next.
  5. Enter a name for your app integration (for example, redshift_app) and choose Next.
  6. Enter following values in the app and leave the rest as is:
    • For Single Sign On URL, enter https://signin.aws.amazon.com/saml.
    • For Audience URI (SP Entity ID), enter urn:amazon:webservices.
    • For Name ID format, enter EmailAddress.
  7. Choose Next.
  8. Choose I’m an Okta customer adding an internal app followed by This is an internal app that we have created.
  9. Choose Finish.
  10. Choose Assignments and then choose Assign.
  11. Choose Assign to groups and then select Assign next to the groups that you want to add.
  12. Choose Done.

Set up Okta advanced configuration

After you create the custom SAML app, complete the following steps:

  1. On the admin console, navigate to General and choose Edit under SAML settings.
  2. Choose Next.
  3. Set Default Relay State to the Query Editor V2 URL, using the format https://<region>.console.aws.amazon.com/sqlworkbench/home. For this post, we use https://us-west-2.console.aws.amazon.com/sqlworkbench/home.
  4. Under Attribute Statements (optional), add the following properties:
    • Provide the IAM role and IdP in comma-separated format using the Role attribute. You’ll create this same IAM role and IdP in a later step when setting up AWS configuration.
    • Set user.login for RoleSessionName. This is used as an identifier for the temporary credentials that are issued when the role is assumed.
    • Set the DB roles using PrincipalTag:RedshiftDbRoles. This uses the Okta groups to fill the principal tags and map them automatically with the Amazon Redshift database roles. Its value must be a colon-separated list in the format role1:role2.
    • Set user.login for PrincipalTag:RedshiftDbUser. This uses the user name in the directory. This is a required tag and defines the database user that is used by Query Editor V2.
    • Set the transitive keys using TransitiveTagKeys. This prevents users from changing the session tags in case of role chaining.

The preceding tags are forwarded to the GetCredentials API to get temporary credentials for your Redshift Serverless instance and map automatically with Amazon Redshift database roles. The following table summarizes their attribute statements configuration.

Name Name Format Format Example
https://aws.amazon.com/SAML/Attributes/Role Unspecified arn:aws:iam::<yourAWSAccountID>:role/role-name,arn:aws:iam:: <yourAWSAccountID>:saml-provider/provider-name arn:aws:iam::112034567890:role/oktarole,arn:aws:iam::112034567890:saml-provider/oktaidp
https://aws.amazon.com/SAML/Attributes/RoleSessionName Unspecified user.login user.login
https://aws.amazon.com/SAML/Attributes/PrincipalTag:RedshiftDbRoles Unspecified String.join(":", isMemberOfGroupName("group1") ? 'group1' : '', isMemberOfGroupName("group2") ? 'group2' : '') String.join(":", isMemberOfGroupName("sales") ? 'sales' : '', isMemberOfGroupName("finance") ? 'finance' : '')
https://aws.amazon.com/SAML/Attributes/PrincipalTag:RedshiftDbUser Unspecified user.login user.login
https://aws.amazon.com/SAML/Attributes/TransitiveTagKeys Unspecified Arrays.flatten("RedshiftDbUser", "RedshiftDbRoles") Arrays.flatten("RedshiftDbUser", "RedshiftDbRoles")
  1. After you add the attribute claims, choose Next followed by Finish.

Your attributes should be in similar format as shown in the following screenshot.

Collect Okta information

To gather your Okta information, complete the following steps:

  1. On the Sign On tab, choose View SAML setup instructions.
  2. For Identity Provider Single Sign-on URL, Use this URL when connecting with any third-party SQL client such as SQL Workbench/J.
  3. Use the IdP metadata in block 4 and save the metadata file in .xml format (for example, metadata.xml).

Set up AWS configuration

In this section, we provide the steps to configure your IAM resources.

Create the IAM IdP

To create your IAM IdP, complete the following steps:

  1. On the IAM console, under Access management in the navigation pane, choose Identity providers.
  2. Choose Add provider.
  3. For Provider type¸ select SAML.
  4. For Provider name¸ enter a name.
  5. Choose Choose file and upload the metadata file (.xml) you downloaded earlier.
  6. Choose Add provider.

Create the IAM Amazon Redshift access policy

To create your IAM policy, complete the following steps:

  1. On the IAM console, choose Policies.
  2. Choose Create policy.
  3. On the Create policy page, choose the JSON tab.
  4. For the policy, enter the JSON in following format:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": "redshift-serverless:GetCredentials",
                "Resource": "<Workgroup ARN>"
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": "redshift-serverless:ListWorkgroups",
                "Resource": "*"
            }
        ]
    }

The workgroup ARN is available on the Redshift Serverless workgroup configuration page.

The following example policy includes only a single Redshift Serverless workgroup; you can modify the policy to include multiple workgroups in the Resource section:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "redshift-serverless:GetCredentials",
            "Resource": "arn:aws:redshift-serverless:us-west-2:123456789012:workgroup/4a4f12vc-123b-2d99-fd34-a12345a1e87f"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "redshift-serverless:ListWorkgroups",
            "Resource": "*"
        }
    ]
}

  1. Choose Next: Tags.
  2. Choose Next: Review.
  3. In the Review policy section, for Name, enter the name of your policy; for example, OktaRedshiftPolicy.
  4. For Description, you can optionally enter a brief description of what the policy does.
  5. Choose Create policy.

Create the IAM role

To create your IAM role, complete the following steps:

  1. On the IAM console, choose Roles in the navigation pane.
  2. Choose Create role.
  3. For Trusted entity type, select SAML 2.0 federation.
  4. For SAML 2.0-based provider, choose the IdP you created earlier.
  5. Select Allow programmatic and AWS Management Console access.
  6. Choose Next.
  7. Choose the policy you created earlier.
  8. Also, add the policy AmazonRedshiftQueryEditorV2ReadSharing.
  9. Choose Next.
  10. In the Review section, for Role Name, enter the name of your role; for example, oktarole.
  11. For Description, you can optionally enter a brief description of what the role does.
  12. Choose Create role.
  13. Navigate to the role that you just created and choose Trust Relationships.
  14. Choose Edit trust policy and choose TagSession under Add actions for STS.

When using session tags, trust policies for all roles connected to the IdP passing tags must have the sts:TagSession permission. For roles without this permission in the trust policy, the AssumeRole operation fails.

  1. Choose Update policy.

The following screenshot shows the role permissions.

The following screenshot shows the trust relationships.

Update the advanced Okta Role Attribute

Complete the following steps:

  1. Switch back to Okta.com.
  2. Navigate to the application which you created earlier.
  3. Navigate to General and click Edit under SAML settings.
  4. Under Attribute Statements (optional), update the value for the attribute – https://aws.amazon.com/SAML/Attributes/Role, using the actual role and identity provider arn values from the above step. For example, arn:aws:iam::123456789012:role/oktarole,arn:aws:iam::123456789012:saml-provider/oktaidp.

Configure Redshift Serverless role-based access

In this step, we create database roles in Amazon Redshift based on the groups that you created in Okta. Make sure the role name matches with the Okta Group name.

Amazon Redshift roles simplify managing privileges required for your end-users. In this post, we create two database roles, sales and finance, and grant them access to query tables with sales and finance data, respectively. You can download this sample SQL Notebook and import into Redshift Query Editor v2 to run all cells in the notebook used in this example. Alternatively, you can copy and enter the SQL into your SQL client.

The following is the syntax to create a role in Redshift Serverless:

create role <IdP groupname>;

For example:

create role sales;
create role finance;

Create the sales and finance database schema:

create schema sales_schema;
create schema finance_schema;

Create the tables:

CREATE TABLE IF NOT EXISTS finance_schema.revenue
(
account INTEGER   ENCODE az64
,customer VARCHAR(20)   ENCODE lzo
,salesamt NUMERIC(18,0)   ENCODE az64
)
DISTSTYLE AUTO
;

insert into finance_schema.revenue values (10001, 'ABC Company', 12000);
insert into finance_schema.revenue values (10002, 'Tech Logistics', 175400);
insert into finance_schema.revenue values (10003, 'XYZ Industry', 24355);
insert into finance_schema.revenue values (10004, 'The tax experts', 186577);

CREATE TABLE IF NOT EXISTS sales_schema.store_sales
(
ID INTEGER   ENCODE az64,
Product varchar(20),
Sales_Amount INTEGER   ENCODE az64
)
DISTSTYLE AUTO
;

Insert into sales_schema.store_sales values (1,'product1',1000);
Insert into sales_schema.store_sales values (2,'product2',2000);
Insert into sales_schema.store_sales values (3,'product3',3000);
Insert into sales_schema.store_sales values (4,'product4',4000);

The following is the syntax to grant permission to the Redshift Serverless role:

GRANT { { SELECT | INSERT | UPDATE | DELETE | DROP | REFERENCES } [,...]| ALL [ PRIVILEGES ] } ON { [ TABLE ] table_name [, ...] | ALL TABLES IN SCHEMA schema_name [, ...] } TO role <IdP groupname>;

Grant relevant permission to the role as per your requirements. In the following example, we grant full permission to the role sales on sales_schema and only select permission on finance_schema to the role finance:

grant usage on schema sales_schema to role sales;
grant select on all tables in schema sales_schema to role sales;

grant usage on schema finance_schema to role finance;
grant select on all tables in schema finance_schema to role finance;

Federate to Redshift Serverless using Query Editor V2

The RedshiftDbRoles principal tag and DBGroups are both mechanisms that can be used to integrate with an IdP. However, federating with the RedshiftDbRoles principal has some clear advantages when it comes to connecting with an IdP because it provides automatic mapping between IdP groups and Amazon Redshift database roles. Overall, RedshiftDbRoles is more flexible, easier to manage, and more secure, making it the better option for integrating Amazon Redshift with your IdP.

Now you’re ready to connect to Redshift Serverless using the Query Editor V2 and federated login:

  1. Use the SSO URL you collected earlier and log in to your Okta account with your user credentials. For this demo, we log in with user Ethan.
  2. In the Query Editor v2, choose your Redshift Serverless instance (right-click) and choose Create connection.
  3. For Authentication, select Federated user.
  4. For Database, enter the database name you want to connect to.
  5. Choose Create Connection.

User Ethan will be able to access sales_schema tables. If Ethan tries to access the tables in finance_schema, he will get a permission denied error.

Configure the SQL client (SQL Workbench/J)

To set up SQL Workbench/J, complete the following steps:

  1. Create a new connection in SQL Workbench/J and choose Redshift Serverless as the driver.
  2. Choose Manage drivers and add all the files from the downloaded AWS JDBC driver pack .zip file (remember to unzip the .zip file).
  3. For Username and Password, enter the values that you set in Okta.
  4. Capture the values for app_id, app_name, and idp_host from the Okta app embed link, which can be found on the General tab of your application.
  5. Set the following extended properties:
    • For app_id, enter the value from app embed link (for example, 0oa8p1o1RptSabT9abd0/avc8k7abc32lL4izh3b8).
    • For app_name, enter the value from app embed link (for example, dev-123456_redshift_app_2).
    • For idp_host, enter the value from app embed link (for example, dev-123456.okta.com).
    • For plugin_name, enter com.amazon.redshift.plugin.OktaCredentialsProvider. The following screenshot shows the SQL Workbench/J extended properties.

      1. Choose OK.
      2. Choose Test from SQL Workbench/J to test the connection.
      3. When the connection is successful, choose OK.
      4. Choose OK to sign in with the users created.

User Ethan will be able to access the sales_schema tables. If Ethan tries to access the tables in the finance_schema, he will get a permission denied error.

Congratulations! You have federated with Redshift Serverless and Okta with SQL Workbench/J using RedshiftDbRoles.

[Optional] Implement MFA with SQL Client and Query Editor V2

Implementing MFA poses an additional challenge because the nature of multi-factor authentication is an asynchronous process between initiating the login (the first factor) and completing the login (the second factor). The SAML response will be returned to the appropriate listener in each scenario; the SQL Client or the AWS console in the case of QEV2. Depending on which login options you will be giving your users, you may need an additional Okta application. See below for the different scenarios:

  1. If you are ONLY using QEV2 and not using any other SQL client, then you can use MFA with Query Editor V2 with the above application. There are no changes required in the custom SAML application which we have created above.
  2. If you are NOT using QEV2 and only using third party SQL client (SQL Workbench/J etc), then you need to modify the above custom SAML app as mentioned below.
  3. If you want to use QEV2 and third-party SQL Client with MFA, then you need create an additional custom SAML app as mentioned below.

Prerequisites for MFA

Each identity provider (IdP) has step for enabling and managing MFA for your users. In the case of Okta, see the following guides on how to enable MFA using the Okta Verify application and by defining an authentication policy.

Steps to create/update SAML application which supports MFA for a SQL Client

  1. If creating a second app, follow all the steps which are described under section 1 (Create Okta SAML application).
  2. Open the custom SAML app and select General.
  3. Select Edit under SAML settings
  4. Click Next in General Settings
  5. Under General, update the Single sign-on URL to http://localhost:7890/redshift/
  6. Select Next followed by Finish.

Below is the screenshot from the MFA App after making above changes:

Configure SQL Client for MFA

To set up SQL Workbench/J, complete the following steps:

  1. Follow all the steps which are described under (Configure the SQL client (SQL Workbench/J))
  2. Modify your connection updating the extended properties:
    • login_url – Get the Single Sign-on URL as shown in section -Collect Okta information. (For example, https://dev-123456.okta.com/app/dev-123456_redshiftapp_2/abc8p6o5psS6xUhBJ517/sso/saml)
    • plugin_name – com.amazon.redshift.plugin.BrowserSamlCredentialsProvider
  3. Choose OK
  4. Choose OK from SQL Workbench/J. You’re redirected to the browser to sign in with your Okta credentials.
  5. After that, you will get prompt for MFA. Choose either Enter a code or Get a push notification.
  6. Once authentication is successful, log in to be redirected to a page showing the connection as successful.
  7. With this connection profile, run the following query to return federated user name.

Troubleshooting

If your connection didn’t work, consider the following:

  • Enable logging in the driver. For instructions, see Configure logging.
  • Make sure to use the latest Amazon Redshift JDBC driver version.
  • If you’re getting errors while setting up the application on Okta, make sure you have admin access.
  • If you can authenticate via the SQL client but get a permission issue or can’t see objects, grant the relevant permission to the role, as detailed earlier in this post.

Clean up

When you’re done testing the solution, clean up the resources to avoid incurring future charges:

  1. Delete the Redshift Serverless instance by deleting both the workgroup and the namespace.
  2. Delete the IAM roles, IAM IdPs, and IAM policies.

Conclusion

In this post, we provided step-by-step instructions to integrate Redshift Serverless with Okta using the Amazon Redshift Query Editor V2 and SQL Workbench/J with the help of federated IAM roles and automatic database-role mapping. You can use a similar setup with any other SQL client (such as DBeaver or DataGrip) or business intelligence tool (such as Tableau Desktop). We also showed how Okta group membership is mapped automatically with Redshift Serverless roles to use role-based authentication seamlessly.

For more information about Redshift Serverless single sign-on using database roles, see Defining database roles to grant to federated users in Amazon Redshift Serverless.


About the Authors

Maneesh Sharma is a Senior Database Engineer at AWS with more than a decade of experience designing and implementing large-scale data warehouse and analytics solutions. He collaborates with various Amazon Redshift Partners and customers to drive better integration.

Debu-PandaDebu Panda is a Senior Manager, Product Management at AWS. He is an industry leader in analytics, application platform, and database technologies, and has more than 25 years of experience in the IT world.

Mohamed ShaabanMohamed Shaaban is a Senior Software Engineer in Amazon Redshift and is based in Berlin, Germany. He has over 12 years of experience in the software engineering. He is passionate about cloud services and building solutions that delight customers. Outside of work, he is an amateur photographer who loves to explore and capture unique moments.

Rajiv Gupta is Sr. Manager of Analytics Specialist Solutions Architects based out of Irvine, CA. He has 20+ years of experience building and managing teams who build data warehouse and business intelligence solutions.

Amol Mhatre is a Database Engineer in Amazon Redshift and works on Customer & Partner engagements. Prior to Amazon, he has worked on multiple projects involving Database & ERP implementations.

Ning Di is a Software Development Engineer at Amazon Redshift, driven by a genuine passion for exploring all aspects of technology.

Harsha Kesapragada is a Software Development Engineer for Amazon Redshift with a passion to build scalable and secure systems. In the past few years, he has been working on Redshift Datasharing, Security and Redshift Serverless.

How Encored Technologies built serverless event-driven data pipelines with AWS

Post Syndicated from Younggu Yun original https://aws.amazon.com/blogs/big-data/how-encored-technologies-built-serverless-event-driven-data-pipelines-with-aws/

This post is a guest post co-written with SeonJeong Lee, JaeRyun Yim, and HyeonSeok Yang from Encored Technologies.

Encored Technologies (Encored) is an energy IT company in Korea that helps their customers generate higher revenue and reduce operational costs in renewable energy industries by providing various AI-based solutions. Encored develops machine learning (ML) applications predicting and optimizing various energy-related processes, and their key initiative is to predict the amount of power generated at renewable energy power plants.

In this post, we share how Encored runs data engineering pipelines for containerized ML applications on AWS and how they use AWS Lambda to achieve performance improvement, cost reduction, and operational efficiency. We also demonstrate how to use AWS services to ingest and process GRIB (GRIdded Binary) format data, which is a file format commonly used in meteorology to store and exchange weather and climate data in a compressed binary form. It allows for efficient data storage and transmission, as well as easy manipulation of the data using specialized software.

Business and technical challenge

Encored is expanding their business into multiple countries to provide power trading services for end customers. The amount of data and the number of power plants they need to collect data are rapidly increasing over time. For example, the volume of data required for training one of the ML models is more than 200 TB. To meet the growing requirements of the business, the data science and platform team needed to speed up the process of delivering model outputs. As a solution, Encored aimed to migrate existing data and run ML applications in the AWS Cloud environment to efficiently process a scalable and robust end-to-end data and ML pipeline.

Solution overview

The primary objective of the solution is to develop an optimized data ingestion pipeline that addresses the scaling challenges related to data ingestion. During its previous deployment in an on-premises environment, the time taken to process data from ingestion to preparing the training dataset exceeded the required service level agreement (SLA). One of the input datasets required for ML models is weather data supplied by the Korea Meteorological Administration (KMA). In order to use the GRIB datasets for the ML models, Encored needed to prepare the raw data to make it suitable for building and training ML models. The first step was to convert GRIB to the Parquet file format.

Encored used Lambda to run an existing data ingestion pipeline built in a Linux-based container image. Lambda is a compute service that lets you run code without provisioning or managing servers. Lambda runs your code on a high-availability compute infrastructure and performs all of the administration of the compute resources, including server and operating system maintenance, capacity provisioning and automatic scaling, and logging. AWS Lambda is triggered to ingest and process GRIB data files when they are uploaded to Amazon Simple Storage Service (Amazon S3). Once the files are processed, they are stored in Parquet format in the other S3 bucket. Encored receives GRIB files throughout the day, and whenever new files arrive, an AWS Lambda function runs a container image registered in Amazon Elastic Container Registry (ECR). This event-based pipeline triggers a customized data pipeline that is packaged in a container-based solution. Leveraging Amazon AWS Lambda, this solution is cost-effective, scalable, and high-performing.Encored uses Python as their preferred language.

The following diagram illustrates the solution architecture.

Lambda-data-pipeline

For data-intensive tasks such as extract, transform, and load (ETL) jobs and ML inference, Lambda is an ideal solution because it offers several key benefits, including rapid scaling to meet demand, automatic scaling to zero when not in use, and S3 event triggers that can initiate actions in response to object-created events. All this contributes to building a scalable and cost-effective data event-driven pipeline. In addition to these benefits, Lambda allows you to configure ephemeral storage (/tmp) between 512–10,240 MB. Encored used this storage for their data application when reading or writing data, enabling them to optimize performance and cost-effectiveness. Furthermore, Lambda’s pay-per-use pricing model means that users only pay for the compute time in use, making it a cost-effective solution for a wide range of use cases.

Prerequisites

For this walkthrough, you should have the following:

Build your application required for your Docker image

The first step is to develop an application that can ingest and process files. This application reads the bucket name and object key passed from a trigger added to Lambda function. The processing logic involves three parts: downloading the file from Amazon S3 into ephemeral storage (/tmp), parsing the GRIB formatted data, and saving the parsed data to Parquet format.

The customer has a Python script (for example, app.py) that performs these tasks as follows:

import os
import tempfile
import boto3
import numpy as np
import pandas as pd
import pygrib

s3_client = boto3.client('s3')
def handler(event, context):
    try:
        # Get trigger file name
        bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
        s3_file_name = event["Records"][0]["s3"]["object"]["key"]

        # Handle temp files: all temp objects are deleted when the with-clause is closed
        with tempfile.NamedTemporaryFile(delete=True) as tmp_file:
            # Step1> Download file from s3 into temp area
            s3_file_basename = os.path.basename(s3_file_name)
            s3_file_dirname = os.path.dirname(s3_file_name)
            local_filename = tmp_file.name
            s3_client.download_file(
                Bucket=bucket_name,
                Key=f"{s3_file_dirname}/{s3_file_basename}",
                Filename=local_filename
            )

            # Step2> Parse – GRIB2 
            grbs = pygrib.open(local_filename)
            list_of_name = []
            list_of_values = []
            for grb in grbs:
                list_of_name.append(grb.name)
                list_of_values.append(grb.values)
            _, lat, lon = grb.data()
            list_of_name += ["lat", "lon"]
            list_of_values += [lat, lon]
            grbs.close()

            dat = pd.DataFrame(
                np.transpose(np.stack(list_of_values).reshape(len(list_of_values), -1)),
                columns=list_of_name,
            )

        # Step3> To Parquet
        s3_dest_uri = S3path
        dat.to_parquet(s3_dest_uri, compression="snappy")

    except Exception as err:
        print(err)

Prepare a Docker file

The second step is to create a Docker image using an AWS base image. To achieve this, you can create a new Dockerfile using a text editor on your local machine. This Dockerfile should contain two environment variables:

  • LAMBDA_TASK_ROOT=/var/task
  • LAMBDA_RUNTIME_DIR=/var/runtime

It’s important to install any dependencies under the ${LAMBDA_TASK_ROOT} directory alongside the function handler to ensure that the Lambda runtime can locate them when the function is invoked. Refer to the available Lambda base images for custom runtime for more information.

FROM public.ecr.aws/lambda/python:3.8

# Install the function's dependencies using file requirements.txt
# from your project folder.

COPY requirements.txt  .
RUN pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"

# Copy function code
COPY app.py ${LAMBDA_TASK_ROOT}

# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "app.handler" ]

Build a Docker image

The third step is to build your Docker image using the docker build command. When running this command, make sure to enter a name for the image. For example:

docker build -t process-grib .

In this example, the name of the image is process-grib. You can choose any name you like for your Docker image.

Upload the image to the Amazon ECR repository

Your container image needs to reside in an Amazon Elastic Container Registry (Amazon ECR) repository. Amazon ECR is a fully managed container registry offering high-performance hosting, so you can reliably deploy application images and artifacts anywhere. For instructions on creating an ECR repository, refer to Creating a private repository.

The first step is to authenticate the Docker CLI to your ECR registry as follows:

aws ecr get-login-password --region ap-northeast-2 | docker login --username AWS --password-stdin 123456789012.dkr.ecr.ap-northeast-2.amazonaws.com 

The second step is to tag your image to match your repository name, and deploy the image to Amazon ECR using the docker push command:

docker tag  hello-world:latest 123456789012.dkr.ecr. ap-northeast-2.amazonaws.com/hello-world:latest
docker push 123456789012.dkr.ecr. ap-northeast-2.amazonaws.com/hello-world:latest     

Deploy Lambda functions as container images

To create your Lambda function, complete the following steps:

  1. On the Lambda console, choose Functions in the navigation pane.
  2. Choose Create function.
  3. Choose the Container image option.
  4. For Function name, enter a name.
  5. For Container image URI, provide a container image. You can enter the ECR image URI or browse for the ECR image.
  6. Under Container image overrides, you can override configuration settings such as the entry point or working directory that are included in the Dockerfile.
  7. Under Permissions, expand Change default execution role.
  8. Choose to create a new role or use an existing role.
  9. Choose Create function.

Key considerations

To handle a large amount of data concurrently and quickly, Encored needed to store GRIB formatted files in the ephemeral storage (/tmp) that comes with Lambda. To achieve this requirement, Encored used tempfile.NamedTemporaryFile, which allows users to create temporary files easily that are deleted when no longer needed. With Lambda, you can configure ephemeral storage between 512 MB–10,240 MB for reading or writing data, allowing you to run ETL jobs, ML inference, or other data-intensive workloads.

Business outcome

Hyoseop Lee (CTO at Encored Technologies) said, “Encored has experienced positive outcomes since migrating to AWS Cloud. Initially, there was a perception that running workloads on AWS would be more expensive than using an on-premises environment. However, we discovered that this was not the case once we started running our applications on AWS. One of the most fascinating aspects of AWS services is the flexible architecture options it provides for processing, storing, and accessing large volumes of data that are only required infrequently.”

Conclusion

In this post, we covered how Encored built serverless data pipelines with Lambda and Amazon ECR to achieve performance improvement, cost reduction, and operational efficiency.

Encored successfully built an architecture that will support their global expansion and enhance technical capabilities through AWS services and the AWS Data Lab program. Based on the architecture and various internal datasets Encored has consolidated and curated, Encored plans to provide renewable energy forecasting and energy trading services.

Thanks for reading this post and hopefully you found it useful. To accelerate your digital transformation with ML, AWS is available to support you by providing prescriptive architectural guidance on a particular use case, sharing best practices, and removing technical roadblocks. You’ll leave the engagement with an architecture or working prototype that is custom fit to your needs, a path to production, and deeper knowledge of AWS services. Please contact your AWS Account Manager or Solutions Architect to get started. If you don’t have an AWS Account Manager, please contact Sales.

To learn more about ML inference use cases with Lambda, check out the following blog posts:

These resources will provide you with valuable insights and practical examples of how to use Lambda for ML inference.


About the Authors

leeSeonJeong Lee is the Head of Algorithms at Encored. She is a data practitioner who finds peace of mind from beautiful codes and formulas.

yimJaeRyun Yim is a Senior Data Scientist at Encored. He is striving to improve both work and life by focusing on simplicity and essence in my work.

yangHyeonSeok Yang is the platform team lead at Encored. He always strives to work with passion and spirit to keep challenging like a junior developer, and become a role model for others.

youngguYounggu Yun works at AWS Data Lab in Korea. His role involves helping customers across the APAC region meet their business objectives and overcome technical challenges by providing prescriptive architectural guidance, sharing best practices, and building innovative solutions together.

Improve reliability and reduce costs of your Apache Spark workloads with vertical autoscaling on Amazon EMR on EKS

Post Syndicated from Rajkishan Gunasekaran original https://aws.amazon.com/blogs/big-data/improve-reliability-and-reduce-costs-of-your-apache-spark-workloads-with-vertical-autoscaling-on-amazon-emr-on-eks/

Amazon EMR on Amazon EKS is a deployment option offered by Amazon EMR that enables you to run Apache Spark applications on Amazon Elastic Kubernetes Service (Amazon EKS) in a cost-effective manner. It uses the EMR runtime for Apache Spark to increase performance so that your jobs run faster and cost less.

Apache Spark allows you to configure the amount of Memory and vCPU cores that a job will utilize. However, tuning these values is a manual process that can be complex and ripe with pitfalls. For example, allocating too little memory can result in out-of-memory exceptions and poor job reliability. On the other hand, too much can result in over-spending on idle resources, poor cluster utilization and high costs. Moreover, it’s hard to right-size these settings for some use cases such as interactive analytics due to lack of visibility into future requirements. In the case of recurring jobs, keeping these settings up to date taking into account changing load patterns (due to external seasonal factors for instance) remains a challenge.

To address this, Amazon EMR on EKS has recently announced support for vertical autoscaling, a feature that uses the Kubernetes Vertical Pod Autoscaler (VPA) to automatically tune the memory and CPU resources of EMR Spark applications to adapt to the needs of the provided workload, simplifying the process of tuning resources and optimizing costs for these applications. You can use vertical autoscaling’s ability to tune resources based on historic data to keep memory and CPU settings up to date even when the profile of the workload varies over time. Additionally, you can use its ability to react to real-time signals to enable applications recover from out-of-memory (OOM) exceptions, helping improve job reliability.

Vertical autoscaling vs existing autoscaling solutions

Vertical autoscaling complements existing Spark autoscaling solutions such as Dynamic Resource Allocation (DRA) and Kubernetes autoscaling solutions such as Karpenter.

Features such as DRA typically work on the horizontal axis, where an increase in load results in an increase in the number of Kubernetes pods that will process the load. In the case of Spark, this results in data being processed across additional executors. When DRA is enabled, Spark starts with an initial number of executors and scales this up if it observes that there are tasks sitting and waiting for executors to run on. DRA works at the pod level and would need an underlying cluster-level auto-scaler such as Karpenter to bring in additional nodes or scale down unused nodes in response to these pods getting created and deleted.

However, for a given data profile and a query plan, sometimes the parallelism and the number of executors can’t be easily changed. As an example, if you’re attempting to join two tables that store data already sorted and bucketed by the join keys, Spark can efficiently join the data by using a fixed number of executors that equals the number of buckets in the source data. Since the number of executors cannot be changed, vertical autoscaling can help here by offering additional resources or scaling down unused resources at the executor level. This has a few advantages:

  • If a pod is optimally sized, the Kubernetes scheduler can efficiently pack in more pods in a single node, leading to better utilization of the underlying cluster.
  • The Amazon EMR on EKS uplift is charged based on the vCPU and memory resources consumed by a Kubernetes pod.This means an optimally sized pod is cheaper.

How vertical autoscaling works

Vertical autoscaling is a feature that you can opt into at the time of submitting an EMR on EKS job. When enabled, it uses VPA to track the resource utilization of your EMR Spark jobs and derive recommendations for resource assignments for Spark executor pods based on this data. The data, fetched from the Kubernetes Metric Server, feeds into statistical models that VPA constructs in order to build recommendations. When new executor pods spin up belonging to a job that has vertical autoscaling enabled, they’re autoscaled based on this recommendation, ignoring the usual sizing done via Spark’s executor memory configuration (controlled by the spark.executor.memory Spark setting).

Vertical autoscaling does not impact pods that are running, since in-place resizing of pods remains unsupported as of Kubernetes version 1.26, the latest supported version of Kubernetes on Amazon EKS as of this writing. However, it’s useful in the case of a recurring job where we can perform autoscaling based on historic data as well as scenarios when some pods go out-of-memory and get re-started by Spark, where vertical autoscaling can be used to selectively scale up the re-started pods and facilitate automatic recovery.

Data tracking and recommendations

To recap, vertical autoscaling uses VPA to track resource utilization for EMR jobs. For a deep-dive into the functionality, refer to the VPA Github repo. In short, vertical autoscaling sets up VPA to track the container_memory_working_set_bytes metric for the Spark executor pods that have vertical autoscaling enabled.

Real-time metric data is fetched from the Kubernetes Metric Server. By default, vertical autoscaling tracks the peak memory working set size for each pod and makes recommendations based on the p90 of the peak with a 40% safety margin added in. It also listens to pod events such as OOM events and reacts to these events. In the case of OOM events, VPA automatically bumps up the recommended resource assignment by 20%.

The statistical models, which also represent historic resource utilization data are stored as custom resource objects on your EKS cluster. This means that deleting these objects also purges old recommendations.

Customized recommendations through job signature

One of the major use-cases of vertical autoscaling is to aggregate usage data across different runs of EMR Spark jobs to derive resource recommendations. To do so, you need to provide a job signature. This can be a unique name or identifier that you configure at the time of submitting your job. If your job recurs at a fixed schedule (such as daily or weekly), it’s important that your job signature doesn’t change for each new instance of the job in order for VPA to aggregate and compute recommendations across different runs of the job.

A job signature can be the same even across different jobs if you believe they’ll have similar resource profiles. You can therefore use the signature to combine tracking and resource modeling across different jobs that you expect to behave similarly. Conversely, if a job’s behavior is changing at some point in time, such as due to a change in the upstream data or the query pattern, you can easily purge the old recommendations by either changing your signature or deleting the VPA custom resource for this signature (as explained later in this post).

Monitoring mode

You can use vertical autoscaling in a monitoring mode where no autoscaling is actually performed. Recommendations are reported to Prometheus if you have that setup on your cluster and you can monitor the recommendations through Grafana dashboards and use that to debug and make manual changes to the resource assignments. Monitoring mode is the default but you can override and use one of the supported autoscaling modes as well at the time of submitting a job. Refer to documentation for usage and a walkthrough on how to get started.

Monitoring vertical autoscaling through kubectl

You can use the Kubernetes command-line tool kubectl to list active recommendations on your cluster, view all the job signatures that are being tracked as well as purge resources associated with signatures that aren’t relevant anymore. In this section, we provide some example code to demonstrate listing, querying, and deleting recommendations.

List all vertical autoscaling recommendations on a cluster

You can use kubectl to get the verticalpodautoscaler resource in order to view the current status and recommendations. The following sample query lists all resources currently active on your EKS cluster:

kubectl get verticalpodautoscalers \
-o custom-columns='NAME:.metadata.name,'\
'SIGNATURE:.metadata.labels.emr-containers\.amazonaws\.com/dynamic\.sizing\.signature,'\
'MODE:.spec.updatePolicy.updateMode,'\
'MEM:.status.recommendation.containerRecommendations[0].target.memory' \
--all-namespaces

This produces output similar to the following

NAME               SIGNATURE           MODE      MEM
ds-<some-id>-vpa   <some-signature>    Off       930143865
ds-<some-id>-vpa   <some-signature>    Initial   14291063673

Query and delete a recommendation

You can also use kubectl to purge recommendation for a job based on the signature. Alternately, you can use the --all flag and skip specifying the signature to purge all the resources on your cluster. Note that in this case you’ll actually be deleting the EMR vertical autoscaling job-run resource. This is a custom resource managed by EMR, deleting it automatically deletes the associated VPA object that tracks and stores recommendations. See the following code:

kubectl delete jobrun -n emr \
-l='emr-containers.amazonaws.com/dynamic.sizing.signature=<some-signature>'
jobrun.dynamicsizing.emr.services.k8s.aws "ds-<some-id>" deleted

You can use the --all and --all-namespaces to delete all vertical autoscaling related resources

kubectl delete jobruns --all --all-namespaces
jobrun.dynamicsizing.emr.services.k8s.aws "ds-<some-id>" deleted

Monitor vertical autoscaling through Prometheus and Grafana

You can use Prometheus and Grafana to monitor the vertical autoscaling functionality on your EKS cluster. This includes viewing recommendations that evolve over time for different job signatures, monitoring the autoscaling functionality etc. For this setup, we assume Prometheus and Grafana are already installed on your EKS cluster using the official Helm charts. If not, refer to the Setting up Prometheus and Grafana for monitoring the cluster section of the Running batch workloads on Amazon EKS workshop to get them up and running on your cluster.

Modify Prometheus to collect vertical autoscaling metrics

Prometheus doesn’t track vertical autoscaling metrics by default. To enable this, you’ll need to start gathering metrics from the VPA custom resource objects on your cluster. This can be easily done by patching your Helm chart with the following configuration:

helm upgrade -f prometheus-helm-values.yaml prometheus prometheus-community/prometheus -n prometheus

Here, prometheus-helm-values.yaml is the vertical autoscaling specific customization that tells Prometheus to gather vertical autoscaling related recommendations from the VPA resource objects, along with the minimal required metadata such as the job’s signature.

You can verify if this setup is working by running the following Prometheus queries for the newly created custom metrics:

  • kube_customresource_vpa_spark_rec_memory_target
  • kube_customresource_vpa_spark_rec_memory_lower
  • kube_customresource_vpa_spark_rec_memory_upper

These represent the lower bound, upper bound and target memory for EMR Spark jobs that have vertical autoscaling enabled. The query can be grouped or filtered using the signature label similar to the following Prometheus query:

kube_customresource_vpa_spark_rec_memory_target{signature=”<some-signature>”}

Use Grafana to visualize recommendations and autoscaling functionality

You can use our sample Grafana dashboard by importing the EMR vertical autoscaling JSON model into your Grafana deployment. The dashboard visualizes vertical autoscaling recommendations alongside the memory provisioned and actually utilized by EMR Spark applications as shown in the following screenshot.

Grafana Dashboard

Results are presented categorized by your Kubernetes namespace and job signature. When you choose a certain namespace and signature combination, you’re presented with a pane. The pane represents a comparison of the vertical autoscaling recommendations for jobs belonging to the chosen signature, compared to the actual resource utilization of that job and the amount of Spark executor memory provisioned to the job. If autoscaling is enabled, the expectation is that the Spark executor memory would track the recommendation. If you’re in monitoring mode however, the two won’t match but you can still view the recommendations from this dashboard or use them to better understand the actual utilization and resource profile of your job.

Illustration of provisioned memory, utilization and recommendations

To better illustrate vertical autoscaling behavior and usage for different workloads, we executed query 2 of the TPC-DS benchmark for 5 iterations — The first two iterations in monitoring mode and the last 3 in autoscaling mode and visualized the results in the Grafana dashboard shared in the previous section.

Monitoring mode

This particular job was provisioned to run with 32GB of executor memory (the blue line in the image) but the actual utilization hovered at around the 10 GB mark (amber line). Vertical autoscaling computes a recommendation of approximately 14 GB based on this run (the green line). This recommendation is based on the actual utilization with a safety margin added in.

Cost optimization example 1

The second iteration of the job was also run in the monitoring mode and the utilization and the recommendations remained unchanged.

cost optimization example 2

Autoscaling mode

Iterations 3 through 5 were run in autoscaling mode. In this case, the provisioned memory drops from 32GB to match the recommended value of 14GB (the blue line).

cost optimization example 3

The utilization and recommendations remained unchanged for subsequent iterations in the case of this example. Furthermore, we observed that all the iterations of the job completed in around 5 minutes, both with and without autoscaling. This example illustrates the successful scaling down of the job’s executor memory allocation by about 56% (a drop from 32GB to approximately 14 GB) which also translates to an equivalent reduction in the EMR memory uplift costs of the job, with no impact to the job’s performance.

Automatic OOM recovery

In the earlier example, we didn’t observe any OOM events as a result of autoscaling. In the rare occasion where autoscaling results in OOM events, jobs should usually be scaled back up automatically. On the other hand, if a job that has autoscaling enabled is under-provisioned and as a result experiences OOM events, vertical autoscaling can scale up resources to facilitate automatic recovery.

In the following example, a job was provisioned with 2.5 GB of executor memory and experienced OOM exceptions during its execution. Vertical autoscaling responded to the OOM events by automatically scaling up failed executors when they were re-started. As seen in the following image, when the amber line representing memory utilization started approaching the blue line representing the provisioned memory, vertical autoscaling kicked in to increase the amount of provisioned memory for the re-started executors, allowing the automatic recovery and successful completion of the job without any intervention. The recommended memory converged to approximately 5 GB before the job completed.

OOM recovery example

All subsequent runs of jobs with the same signature will now start-up with the recommended settings computed earlier, preventing OOM events right from the start.

Cleanup

Refer to documentation for information on cleaning up vertical autoscaling related resources from your cluster. To cleanup your EMR on EKS cluster after trying out the vertical autoscaling feature, refer to the clean-up section of the EMR on EKS workshop.

Conclusion

You can use vertical autoscaling to easily monitor resource utilization for one or more EMR on EKS jobs without any impact to your production workloads. You can use standard Kubernetes tooling including Prometheus, Grafana and kubectl to interact with and monitor vertical autoscaling on your cluster. You can also autoscale your EMR Spark jobs using recommendations that are derived based on the needs of your job, allowing you to realize cost savings and optimize cluster utilization as well as build resiliency to out-of-memory errors. Additionally, you can use it in conjunction with existing autoscaling mechanisms such as Dynamic Resource Allocation and Karpenter to effortlessly achieve optimal vertical resource assignment. Looking ahead, when Kubernetes fully supports in-place resizing of pods, vertical autoscaling will be able to take advantage of it to seamlessly scale your EMR jobs up or down, further facilitating optimal costs and cluster utilization.

To learn more about EMR on EKS vertical autoscaling and getting started with it, refer to documentation. You can also use the EMR on EKS Workshop to try out the EMR on EKS deployment option for Amazon EMR.


About the author

Rajkishan Gunasekaran is a Principal Engineer for Amazon EMR on EKS at Amazon Web Services.

Process price transparency data using AWS Glue

Post Syndicated from Hari Thatavarthy original https://aws.amazon.com/blogs/big-data/process-price-transparency-data-using-aws-glue/

The Transparency in Coverage rule is a federal regulation in the United States that was finalized by the Center for Medicare and Medicaid Services (CMS) in October 2020. The rule requires health insurers to provide clear and concise information to consumers about their health plan benefits, including costs and coverage details. Under the rule, health insurers must make available to their members a list of negotiated rates for in-network providers, as well as an estimate of the member’s out-of-pocket costs for specific health care services. This information must be made available to members through an online tool that is accessible and easy to use. The Transparency in Coverage rule also requires insurers to make available data files that contain detailed information on the prices they negotiate with health care providers. This information can be used by employers, researchers, and others to compare prices across different insurers and health care providers. Phase 1 implementation of this regulation, which went into effect on July 1, 2022, requires that payors publish machine-readable files publicly for each plan that they offer. CMS (Center for Medicare and Medicaid Services) has published a technical implementation guide with file formats, file structure, and standards on producing these machine-readable files.

This post walks you through the preprocessing and processing steps required to prepare data published by health insurers in light of this federal regulation using AWS Glue. We also show how to query and derive insights using Amazon Athena.

AWS Glue is a serverless data integration service that makes it straightforward to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development. Athena is a serverless, interactive analytics service built on open-source frameworks, supporting open-table and file formats. Athena provides a simplified, flexible way to analyze petabytes of data.

Challenges processing these machine-readable files

The machine-readable files published by these payors vary in size. A single file can range from a few megabytes to hundreds of gigabytes. These files contain large JSON objects that are deeply nested. Unlike NDJSON and JSONL formats, where each line in the file is a JSON object, these files contain a single large JSON object that can span across multiple lines. The following figure represents the schema of an in_network rate file published by a major health insurer on their website for public access. This file, when uncompressed, is about 20 GB in size, contains a single JSON object, and is deeply nested. The following figure represents the schema of this JSON object when printed using the Spark printSchema() function. Each highlighted box in red is a nested array structure.

JSON Schema

Loading a 20 GB deeply nested JSON object requires a machine with a large memory footprint. Data when loaded into memory is 4–10 times its size on disk. A 20 GB JSON object may need a machine with up to 200 GB memory. To process workloads larger than 20 GB, these machines need to be scaled vertically, thereby significantly increasing hardware costs. Vertical scaling has its limits, and it’s not possible to scale beyond a certain point. Analyzing this data requires unnesting and flattening of deeply nested array structures. These transformations explode the data at an exponential rate, thereby adding to the need for more memory and disk space.

You can use an in-memory distributed processing framework such as Apache Spark to process and analyze such large volumes of data. However, to load this single large JSON object as a Spark DataFrame and perform an action on it, a worker node needs enough memory to load this object in full. When a worker node tries to load this large deeply nested JSON object and there isn’t enough memory to load it in full, the processing job will fail with out-of-memory issues. This calls for splitting the large JSON object into smaller chunks using some form of preprocessing logic. Once preprocessed, these smaller files can then be further processed in parallel by worker nodes without running into out-of-memory issues.

Solution overview

The solution involves a two-step approach. The first is a preprocessing step, which takes the large JSON object as input and splits it to multiple manageable chunks. This is required to address the challenges we mentioned earlier. The second is a processing step, which prepares and publishes data for analysis.

The preprocessing step uses an AWS Glue Python shell job to split the large JSON object into smaller JSON files. The processing step unnests and flattens the array items from these smaller JSON files in parallel. It then partitions and writes the output as Parquet on Amazon Simple Storage Service (Amazon S3). The partitioned data is cataloged and analyzed using Athena. The following diagram illustrates this workflow.

Solution Overview

Prerequisites

To implement the solution in your own AWS account, you need to create or configure the following AWS resources in advance:

  • An S3 bucket to persist the source and processed data. Download the input file and upload it to the path s3://yourbucket/ptd/2023-03-01_United-HealthCare-Services—Inc-_Third-Party-Administrator_PS1-50_C2_in-network-rates.json.gz.
  • An AWS Identity and Access Management (IAM) role for your AWS Glue extract, transform, and load (ETL) job. For instructions, refer to Setting up IAM permissions for AWS Glue. Adjust the permissions to ensure AWS Glue has read/write access to Amazon S3 locations.
  • An IAM role for Athena with AWS Glue Data Catalog permissions to create and query tables.

Create an AWS Glue preprocessing job

The preprocessing step uses ijson, an open-source iterative JSON parser to extract items in the outermost array of top-level attributes. By streaming and iteratively parsing the large JSON file, the preprocessing step loads only a portion of the file into memory, thereby avoiding out-of-memory issues. It also uses s3pathlib, an open-source Python interface to Amazon S3. This makes it easy to work with S3 file systems.

To create and run the AWS Glue job for preprocessing, complete the following steps:

  1. On the AWS Glue console, choose Jobs under Glue Studio in the navigation pane.
  2. Create a new job.
  3. Select Python shell script editor.
  4. Select Create a new script with boilerplate code.
    Python Shell Script Editor
  5. Enter the following code into the editor (adjust the S3 bucket names and paths to point to the input and output locations in Amazon S3):
import ijson
import json
import decimal
from s3pathlib import S3Path
from s3pathlib import context
import boto3
from io import StringIO

class JSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, decimal.Decimal):
            return float(obj)
        return json.JSONEncoder.default(self, obj)
        
def upload_to_s3(data, upload_path):
    data = bytes(StringIO(json.dumps(data,cls=JSONEncoder)).getvalue(),encoding='utf-8')
    s3_client.put_object(Body=data, Bucket=bucket, Key=upload_path)
        
s3_client = boto3.client('s3')

#Replace with your bucket and path to JSON object on your bucket
bucket = 'yourbucket'
largefile_key = 'ptd/2023-03-01_United-HealthCare-Services--Inc-_Third-Party-Administrator_PS1-50_C2_in-network-rates.json.gz'
p = S3Path(bucket, largefile_key)


#Replace the paths to suit your needs
upload_path_base = 'ptd/preprocessed/base/base.json'
upload_path_in_network = 'ptd/preprocessed/in_network/'
upload_path_provider_references = 'ptd/preprocessed/provider_references/'

#Extract top the values of the following top level attributes and persist them on your S3 bucket
# -- reporting_entity_name
# -- reporting_entity_type
# -- last_updated_on
# -- version

base ={
    'reporting_entity_name' : '',
    'reporting_entity_type' : '',
    'last_updated_on' :'',
    'version' : ''
}

with p.open("r") as f:
    obj = ijson.items(f, 'reporting_entity_name')
    for evt in obj:
        base['reporting_entity_name'] = evt
        break
    
with p.open("r") as f:
    obj = ijson.items(f, 'reporting_entity_type')
    for evt in obj:
        base['reporting_entity_type'] = evt
        break
    
with p.open("r") as f:
    obj = ijson.items(f, 'last_updated_on')
    for evt in obj:
        base['last_updated_on'] = evt
        break
    
with p.open("r") as f:
    obj = ijson.items(f,'version')
    for evt in obj:
        base['version'] = evt
        break
        
upload_to_s3(base,upload_path_base)

#Seek the position of JSON key provider_references 
#Iterate through items in provider_references array, and for every 1000 items create a JSON file on S3 bucket
with p.open("r") as f:
    provider_references = ijson.items(f, 'provider_references.item')
    fk = 0
    lst = []
    for rowcnt,row in enumerate(provider_references):
        if rowcnt % 1000 == 0:
            if fk > 0:
                dest = upload_path_provider_references + path
                upload_to_s3(lst,dest)
            lst = []
            path = 'provider_references_{0}.json'.format(fk)
            fk = fk + 1

        lst.append(row)

    path = 'provider_references_{0}.json'.format(fk)
    dest = upload_path_provider_references + path
    upload_to_s3(lst,dest)
    
#Seek the position of JSON key in_network
#Iterate through items in in_network array, and for every 25 items create a JSON file on S3 bucket
with p.open("r") as f:
    in_network = ijson.items(f, 'in_network.item')
    fk = 0
    lst = []
    for rowcnt,row in enumerate(in_network):
        if rowcnt % 25 == 0:
            if fk > 0:
                dest = upload_path_in_network + path
                upload_to_s3(lst,dest)
            lst = []
            path = 'in_network_{0}.json'.format(fk)
            fk = fk + 1

        lst.append(row)


    path = 'in_network_{0}.json'.format(fk)
    dest = upload_path_in_network + path
    upload_to_s3(lst,dest)
  1. Update the properties of your job on the Job details tab:
    1. For Type, choose Python Shell.
    2. For Python version, choose Python 3.9.
    3. For Data processing units, choose 1 DPU.

For Python shell jobs, you can allocate either 0.0625 or 1 DPU. The default is 0.0625 DPU. A DPU is a relative measure of processing power that consists of 4 vCPUs of compute capacity and 16 GB of memory.

python shell job config

The Python libraries ijson and s3pathlib are available in pip and can be installed using the AWS Glue job parameter --additional-python-modules. You can also choose to package these libraries, upload them to Amazon S3, and refer to them from your AWS Glue job. For instructions on packaging your library, refer to Providing your own Python library.

  1. To install the Python libraries, set the following job parameters:
    • Key--additional-python-modules
    • Valueijson,s3pathlibinstall python modules
  2. Run the job.

The preprocessing step creates three folders in the S3 bucket: base, in_network and provider_references.

s3_folder_1

Files in in_network and provider_references folders contains array of JSON objects. Each of these JSON objects represents an element in the outermost array of the original large JSON object.

s3_folder_2

Create an AWS Glue processing job

The processing job uses the output of the preprocessing step to create a denormalized view of data by extracting and flattening elements and attributes from nested arrays. The extent of unnesting depends on the attributes we need for analysis. For example, attributes such as negotiated_rate, npi, and billing_code are essential for analysis and extracting values associated with these attributes requires multiple levels of unnesting. The denormalized data is then partitioned by the billing_code column, persisted as Parquet on Amazon S3, and registered as a table on the AWS Glue Data Catalog for querying.

The following code sample guides you through the implementation using PySpark. The columns used to partition the data depends on query patterns used to analyze the data. Arriving at a partitioning strategy that is in line with the query patterns will improve overall query performance during analysis. This post assumes that the queries used for analyzing data will always use the column billing_code to filter and fetch data of interest. Data in each partition is bucketed by npi to improve query performance.

To create your AWS Glue job, complete the following steps:

  1. On the AWS Glue console, choose Jobs under Glue Studio in the navigation pane.
  2. Create a new job.
  3. Select Spark script editor.
  4. Select Create a new script with boilerplate code.
  5. Enter the following code into the editor (adjust the S3 bucket names and paths to point to the input and output locations in Amazon S3):
import sys
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
from pyspark.sql.functions import explode

#create a dataframe of base objects - reporting_entity_name, reporting_entity_type, version, last_updated_on
#using the output of preprocessing step

base_df = spark.read.json('s3://yourbucket/ptd/preprocessed/base/')

#create a dataframe over provider_references objects using the output of preprocessing step
prvd_df = spark.read.json('s3://yourbucket/ptd/preprocessed/provider_references/')

#cross join dataframe of base objects with dataframe of provider_references 
prvd_df = prvd_df.crossJoin(base_df)

#create a dataframe over in_network objects using the output of preprocessing step
in_ntwrk_df = spark.read.json('s3://yourbucket/ptd/preprocessed/in_network/')

#unnest and flatten negotiated_rates and provider_references from in_network objects
in_ntwrk_df2 = in_ntwrk_df.select(
 in_ntwrk_df.billing_code, in_ntwrk_df.billing_code_type, in_ntwrk_df.billing_code_type_version,
 in_ntwrk_df.covered_services, in_ntwrk_df.description, in_ntwrk_df.name,
 explode(in_ntwrk_df.negotiated_rates).alias('exploded_negotiated_rates'),
 in_ntwrk_df.negotiation_arrangement)


in_ntwrk_df3 = in_ntwrk_df2.select(
 in_ntwrk_df2.billing_code, in_ntwrk_df2.billing_code_type, in_ntwrk_df2.billing_code_type_version,
 in_ntwrk_df2.covered_services, in_ntwrk_df2.description, in_ntwrk_df2.name,
 in_ntwrk_df2.exploded_negotiated_rates.negotiated_prices.alias(
 'exploded_negotiated_rates_negotiated_prices'),
 explode(in_ntwrk_df2.exploded_negotiated_rates.provider_references).alias(
 'exploded_negotiated_rates_provider_references'),
 in_ntwrk_df2.negotiation_arrangement)

#join the exploded in_network dataframe with provider_references dataframe
jdf = prvd_df.join(
 in_ntwrk_df3,
 prvd_df.provider_group_id == in_ntwrk_df3.exploded_negotiated_rates_provider_references,"fullouter")

#un-nest and flatten attributes from rest of the nested arrays.
jdf2 = jdf.select(
 jdf.reporting_entity_name,jdf.reporting_entity_type,jdf.last_updated_on,jdf.version,
 jdf.provider_group_id, jdf.provider_groups, jdf.billing_code,
 jdf.billing_code_type, jdf.billing_code_type_version, jdf.covered_services,
 jdf.description, jdf.name,
 explode(jdf.exploded_negotiated_rates_negotiated_prices).alias(
 'exploded_negotiated_rates_negotiated_prices'),
 jdf.exploded_negotiated_rates_provider_references,
 jdf.negotiation_arrangement)

jdf3 = jdf2.select(
 jdf2.reporting_entity_name,jdf2.reporting_entity_type,jdf2.last_updated_on,jdf2.version,
 jdf2.provider_group_id,
 explode(jdf2.provider_groups).alias('exploded_provider_groups'),
 jdf2.billing_code, jdf2.billing_code_type, jdf2.billing_code_type_version,
 jdf2.covered_services, jdf2.description, jdf2.name,
 jdf2.exploded_negotiated_rates_negotiated_prices.additional_information.
 alias('additional_information'),
 jdf2.exploded_negotiated_rates_negotiated_prices.billing_class.alias(
 'billing_class'),
 jdf2.exploded_negotiated_rates_negotiated_prices.billing_code_modifier.
 alias('billing_code_modifier'),
 jdf2.exploded_negotiated_rates_negotiated_prices.expiration_date.alias(
 'expiration_date'),
 jdf2.exploded_negotiated_rates_negotiated_prices.negotiated_rate.alias(
 'negotiated_rate'),
 jdf2.exploded_negotiated_rates_negotiated_prices.negotiated_type.alias(
 'negotiated_type'),
 jdf2.exploded_negotiated_rates_negotiated_prices.service_code.alias(
 'service_code'), jdf2.exploded_negotiated_rates_provider_references,
 jdf2.negotiation_arrangement)

jdf4 = jdf3.select(jdf3.reporting_entity_name,jdf3.reporting_entity_type,jdf3.last_updated_on,jdf3.version,
 jdf3.provider_group_id,
 explode(jdf3.exploded_provider_groups.npi).alias('npi'),
 jdf3.exploded_provider_groups.tin.type.alias('tin_type'),
 jdf3.exploded_provider_groups.tin.value.alias('tin'),
 jdf3.billing_code, jdf3.billing_code_type,
 jdf3.billing_code_type_version, jdf3.covered_services,
 jdf3.description, jdf3.name, jdf3.additional_information,
 jdf3.billing_class, jdf3.billing_code_modifier,
 jdf3.expiration_date, jdf3.negotiated_rate,
 jdf3.negotiated_type, jdf3.service_code,
 jdf3.negotiation_arrangement)

#repartition by billing_code. 
#Repartition changes the distribution of data on spark cluster. 
#By repartition data we will avoid writing too many small files.
jdf5=jdf4.repartition("billing_code")
 
datasink_path = "s3://yourbucket/ptd/processed/billing_code_npi/parquet/"

#persist dataframe as parquet on S3 and catalog it
#Partition the data by billing_code. This enables analytical queries to skip data and improve performance of queries
#Data is also bucketed and sorted npi to improve query performance during analysis

jdf5.write.format('parquet').mode("overwrite").partitionBy('billing_code').bucketBy(2, 'npi').sortBy('npi').saveAsTable('ptdtable', path = datasink_path)
  1. Update the properties of your job on the Job details tab:
    1. For Type, choose Spark.
    2. For Glue version, choose Glue 4.0.
    3. For Language, choose Python 3.
    4. For Worker type, choose G 2X.
    5. For Requested number of workers, enter 20.

Arriving at the number of workers and worker type to use for your processing job depends on factors such as the amount of data being processed, the speed at which it needs to be processed, and the partitioning strategy used. Repartitioning of data can result in out-of-memory issues, especially when data is heavily skewed on the column used to repartition. It’s possible to reach Amazon S3 service limits if too many workers are assigned to the job. This is because tasks running on these worker nodes may try to read/write from the same S3 prefix, causing Amazon S3 to throttle the incoming requests. For more details, refer to Best practices design patterns: optimizing Amazon S3 performance.

processing job config

Exploding array elements creates new rows and columns, thereby exponentially increasing the amount of data that needs to be processed. Apache Spark splits this data into multiple Spark partitions on different worker nodes so that it can process large amounts of data in parallel. In Apache Spark, shuffling happens when data needs to be redistributed across the cluster. Shuffle operations are commonly triggered by wide transformations such as join, reduceByKey, groupByKey, and repartition. In case of exceptions due to local storage limitations, it helps to supplement or replace local disk storage capacity with Amazon S3 for large shuffle operations. This is possible with the AWS Glue Spark shuffle plugin with Amazon S3. With the cloud shuffle storage plugin for Apache Spark, you can avoid disk space-related failures.

  1. To use the Spark shuffle plugin, set the following job parameters:
    • Key--write-shuffle-files-to-s3
    • Valuetrue
      spark shuffle plugin

Query the data

You can query the cataloged data using Athena. For instructions on setting up Athena, refer to Setting up.

On the Athena console, choose Query editor in the navigation pane to run your query, and specify your data source and database.

sql query

To find the minimum, maximum, and average negotiated rates for procedure codes, run the following query:

SELECT
billing_code,
round(min(negotiated_rate),2) as min_price,
round(avg(negotiated_rate),2) as avg_price,
round(max(negotiated_rate),2) as max_price,
description
FROM "default"."ptdtable"
group by billing_code, description
limit 10;

The following screenshot shows the query results.

sql query results

Clean up

To avoid incurring future charges, delete the AWS resources you created:

  1. Delete the S3 objects and bucket.
  2. Delete the IAM policies and roles.
  3. Delete the AWS Glue jobs for preprocessing and processing.

Conclusion

This post guided you through the necessary preprocessing and processing steps to query and analyze price transparency-related machine-readable files. Although it’s possible to use other AWS services to process such data, this post focused on preparing and publishing data using AWS Glue.

To learn more about the Transparency in Coverage rule, refer to Transparency in Coverage. For best practices for scaling Apache Spark jobs and partitioning data with AWS Glue, refer to Best practices to scale Apache Spark jobs and partition data with AWS Glue. To learn how to monitor AWS Glue jobs, refer to Monitoring AWS Glue Spark jobs.

We look forward to hearing any feedback or questions.


About the Authors

hari thatavarthyHari Thatavarthy is a Senior Solutions Architect on the AWS Data Lab team. He helps customers design and build solutions in the data and analytics space. He believes in data democratization and loves to solve complex data processing-related problems. In his spare time, he loves to play table tennis.

Krishna MaddiletiKrishna Maddileti is a Senior Solutions Architect on the AWS Data Lab team. He partners with customers on their AWS journey and helps them with data engineering, data lakes, and analytics. In his spare time, he enjoys spending time with his family and playing video games with his 7-year-old.

yadukishore tatavartiYadukishore Tatavarthi is a Senior Partner Solutions Architect at AWS. He works closely with global system integrator partners to enable and support customers moving their workloads to AWS.

Manish KolaManish Kola is a Solutions Architect on the AWS Data Lab team. He partners with customers on their AWS journey.

Noritaki SakayamiNoritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Amazon OpenSearch Service now supports 99.99% availability using Multi-AZ with Standby

Post Syndicated from Prashant Agrawal original https://aws.amazon.com/blogs/big-data/amazon-opensearch-service-now-supports-99-99-availability-using-multi-az-with-standby/

Customers use Amazon OpenSearch Service for mission-critical applications and monitoring. But what happens when OpenSearch Service itself is unavailable? If your ecommerce search is down, for example, you’re losing revenue. If you’re monitoring your application with OpenSearch Service, and it becomes unavailable, your ability to detect, diagnose, and repair issues with your application is diminished. In these cases, you may suffer lost revenue, customer dissatisfaction, reduced productivity, or even damage to your organization’s reputation.

OpenSearch Service offers an SLA of three 9s (99.9%) availability when following best practices. However, following those practices is complicated, and can require knowledge of and experience with OpenSearch’s data deployment and management, along with an understanding of how OpenSearch Service interacts with AWS Availability Zones and networking, distributed systems, OpenSearch’s self-healing capabilities, and its recovery methods. Furthermore, when an issue arises, such as a node becoming unresponsive, OpenSearch Service recovers by recreating the missing shards (data), causing a potentially large movement of data in the domain. This data movement increases resource usage on the cluster, which can impact performance. If the cluster is not sized properly, it can experience degraded availability, which defeats the purpose of provisioning the cluster across three Availability Zones.

Today, AWS is announcing the new deployment option Multi-AZ with Standby for OpenSearch Service, which helps you offload some of that heavy lifting in terms of high frequency monitoring, fast failure detection, and quick recovery from failure, and keeps your domains available and performant even in the event of an infrastructure failure. With Multi-AZ with Standby, you get 99.99% availability with consistent performance for a domain.

In this post, we discuss the benefits of this new option and how to configure your OpenSearch cluster with Multi-AZ with Standby.

Solution overview

The OpenSearch Service team has incorporated years of experience running tens of thousands of domains for our customers into the Multi-AZ with Standby feature. When you adopt Multi-AZ with Standby, OpenSearch Service creates a cluster across three Availability Zones, with each Availability Zone containing a complete copy of data in the cluster. OpenSearch Service then puts one Availability Zone into standby mode, routing all queries to the other two Availability Zones. When it detects a hardware-related failure, OpenSearch Service promotes nodes from the standby pool to become active in less than a minute. When you use Multi-AZ with Standby, OpenSearch Service doesn’t need to redistribute or recreate data from missing nodes. As a result, cluster performance is unaffected, removing the risk of degraded availability.

Prerequisites

Multi-AZ with Standby requires the following prerequisites:

  • The domain needs to run on OpenSearch 1.3 or above
  • The domain is deployed across three Availability Zones
  • The domain has three (or a multiple of three) data notes
  • You must use three dedicated cluster manager (master) nodes

Refer to Sizing Amazon OpenSearch Service domains for guidance on sizing your domain and dedicated cluster manager nodes.

Configure your OpenSearch cluster using Multi-AZ with Standby

You can use Multi-AZ with Standby when you create a new domain, or you can add it to an existing domain. If you’re creating a new domain using the AWS Management Console, you can create it with Multi-AZ with Standby by either selecting the new Easy create option or the traditional Standard create option. You can update existing domains to use Multi-AZ with Standby by editing their domain configuration.

The Easy create option, as the name suggests, makes creating a domain easier by defaulting to best practice choices for most of the configuration (the majority of which can be altered later). The domain will be set up for high availability from the start and deployed as Multi-AZ with Standby.

While choosing the data nodes, you should choose three (or a multiple of three) data nodes so that they are equally distributed across each of the Availability Zones. The Data nodes table on the OpenSearch Service console provides a visual representation of the data notes, showing that one of the Availability Zones will be put on standby.

Similarly, while selecting the cluster manager (master) node, consider the number of data nodes, indexes, and shards that you plan to have before deciding the instance size.

After the domain is created, you can check its deployment type on the OpenSearch Service console under Cluster configuration, as shown in the following screenshot.

While creating an index, make sure that the number of copies (primary and replica) are multiples of three. If you don’t specify the number of replicas, the service will default to two. This is important so that there is at least one copy of the data in each Availability Zone. We recommend using an index template or similar for logs workloads.

OpenSearch Service distributes the nodes and data copies equally across the three Availability Zones. During normal operations, the standby nodes don’t receive any search requests. The two active Availability Zones respond to all the search requests. However, data is replicated to these standby nodes to ensure you have a full copy of the data in each Availability Zone at all times.

Response to infrastructure failure events

OpenSearch Service continuously monitors the domain for events like node failure, disk failure, or Availability Zone failure. In the event of an infrastructure failure like an Availability Zone failure, OpenSearch Services promotes the standby nodes to active while the impacted Availability Zone recovers. Impact (if any) is limited to the in-flight requests as traffic is weighed away from the impacted Availability Zone in less a minute.

You can check the status of the domain, data node metrics for both active and standby, and Availability Zone rotation metrics on the Cluster health tab. The following screenshots show the cluster health and metrics for data nodes such as CPU utilization, JVM memory pressure, and storage.

The following screenshot of the AZ Rotation Metrics section (you can find this under Cluster health tab) shows the read and write status of the Availability Zones. OpenSearch Service rotates the standby Availability Zone every 30 minutes to ensure the system is running and ready to respond to events. Availability Zones responding to traffic have a read value of 1, and the standby Availability Zone has a value of 0.

Considerations

Several improvements and guardrails have been made for this feature that offer higher availability and maintain performance. Some static limits have been applied that are specifically related to the number of shards per node, number of shards for a domain, and the size of a shard. OpenSearch Service also enables Auto-Tune by default. Multi-AZ with Standby restricts the storage to GP3- or SSD-backed instances for the most cost-effective and performant storage options. Additionally, we’re introducing an advanced traffic shaping mechanism that will detect rogue queries, which further enhances the reliability of the domain.

We recommend evaluating your domain infrastructure needs based on your workload to achieve high availability and performance.

Conclusion

Multi-AZ with Standby is now available on OpenSearch Service in all AWS Regions globally where OpenSearch service is available, except US West (N. California), and AWS GovCloud (US-Gov-East, US-Gov-West). Try it out and send your feedback to AWS re:Post for Amazon OpenSearch Service or through your usual AWS support contacts.


About the authors

Prashant Agrawal is a Sr. Search Specialist Solutions Architect with Amazon OpenSearch Service. He works closely with customers to help them migrate their workloads to the cloud and helps existing customers fine-tune their clusters to achieve better performance and save on cost. Before joining AWS, he helped various customers use OpenSearch and Elasticsearch for their search and log analytics use cases. When not working, you can find him traveling and exploring new places. In short, he likes doing Eat → Travel → Repeat.

Rohin Bhargava is a Sr. Product Manager with the Amazon OpenSearch Service team. His passion at AWS is to help customers find the correct mix of AWS services to achieve success for their business goals.

Build, deploy, and run Spark jobs on Amazon EMR with the open-source EMR CLI tool

Post Syndicated from Damon Cortesi original https://aws.amazon.com/blogs/big-data/build-deploy-and-run-spark-jobs-on-amazon-emr-with-the-open-source-emr-cli-tool/

Today, we’re pleased to introduce the Amazon EMR CLI, a new command line tool to package and deploy PySpark projects across different Amazon EMR environments. With the introduction of the EMR CLI, you now have a simple way to not only deploy a wide range of PySpark projects to remote EMR environments, but also integrate with your CI/CD solution of choice.

In this post, we show how you can use the EMR CLI to create a new PySpark project from scratch and deploy it to Amazon EMR Serverless in one command.

Overview of solution

The EMR CLI is an open-source tool to help improve the developer experience of developing and deploying jobs on Amazon EMR. When you’re just getting started with Apache Spark, there are a variety of options with respect to how to package, deploy, and run jobs that can be overwhelming or require deep domain expertise. The EMR CLI provides simple commands for these actions that remove the guesswork from deploying Spark jobs. You can use it to create new projects or alongside existing PySpark projects.

In this post, we walk through creating a new PySpark project that analyzes weather data from the NOAA Global Surface Summary of Day open dataset. We’ll use the EMR CLI to do the following:

  1. Initialize the project.
  2. Package the dependencies.
  3. Deploy the code and dependencies to Amazon Simple Storage Service (Amazon S3).
  4. Run the job on EMR Serverless.

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • An AWS account
  • An EMR Serverless application in the us-east-1 Region
  • An S3 bucket for your code and logs in the us-east-1 Region
  • An AWS Identity and Access Management (IAM) job role that can run EMR Serverless jobs and access S3 buckets
  • Python version >= 3.7
  • Docker

If you don’t already have an existing EMR Serverless application, you can use the following AWS CloudFormation template or use the emr bootstrap command after you’ve installed the CLI.

BDB-2063-launch-cloudformation-stack

Install the EMR CLI

You can find the source for the EMR CLI in the GitHub repo, but it’s also distributed via PyPI. It requires Python version >= 3.7 to run and is tested on macOS, Linux, and Windows. To install the latest version, use the following command:

pip3 install emr-cli

You should now be able to run the emr --help command and see the different subcommands you can use:

❯ emr --help                                                                                                
Usage: emr [OPTIONS] COMMAND [ARGS]...                                                                      
                                                                                                            
  Package, deploy, and run PySpark projects on EMR.                                                         
                                                                                                            
Options:                                                                                                    
  --help  Show this message and exit.                                                                       
                                                                                                            
Commands:                                                                                                   
  bootstrap  Bootstrap an EMR Serverless environment.                                                       
  deploy     Copy a local project to S3.                                                                    
  init       Initialize a local PySpark project.                                                            
  package    Package a project and dependencies into dist/                                                  
  run        Run a project on EMR, optionally build and deploy                                              
  status 

If you didn’t already create an EMR Serverless application, the bootstrap command can create a sample environment for you and a configuration file with the relevant settings. Assuming you used the provided CloudFormation stack, set the following environment variables using the information on the Outputs tab of your stack. Set the Region in the terminal to us-east-1 and set a few other environment variables we’ll need along the way:

export AWS_REGION=us-east-1
export APPLICATION_ID=<YOUR_EMR_SERVERLESS_APPLICATION_ID>
export JOB_ROLE_ARN=<YOUR_EMR_SERVERLESS_JOB_ROLE_ARN>
export S3_BUCKET=<YOUR_S3_BUCKET_NAME>

We use us-east-1 because that’s where the NOAA GSOD data bucket is. EMR Serverless can access S3 buckets and other AWS resources in the same Region by default. To access other services, configure EMR Serverless with VPC access.

Initialize a project

Next, we use the emr init command to initialize a default PySpark project for us in the provided directory. The default templates create a standard Python project that uses pyproject.toml to define its dependencies. In this case, we use Pandas and PyArrow in our script, so those are already pre-populated.

❯ emr init my-project
[emr-cli]: Initializing project in my-project
[emr-cli]: Project initialized.

After the project is initialized, you can run cd my-project or open the my-project directory in your code editor of choice. You should see the following set of files:

my-project  
├── Dockerfile  
├── entrypoint.py  
├── jobs  
│ └── extreme_weather.py  
└── pyproject.toml

Note that we also have a Dockerfile here. This is used by the package command to ensure that our project dependencies are built on the right architecture and operating system for Amazon EMR.

If you use Poetry to manage your Python dependencies, you can also add a --project-type poetry flag to the emr init command to create a Poetry project.

If you already have an existing PySpark project, you can use emr init --dockerfile to create the Dockerfile necessary to package things up.

Run the project

Now that we’ve got our sample project created, we need to package our dependencies, deploy the code to Amazon S3, and start a job on EMR Serverless. With the EMR CLI, you can do all of that in one command. Make sure to run the command from the my-project directory:

emr run \
--entry-point entrypoint.py \
--application-id ${APPLICATION_ID} \
--job-role ${JOB_ROLE_ARN} \
--s3-code-uri s3://${S3_BUCKET}/tmp/emr-cli-demo/ \
--build \
--wait

This command performs several actions:

  1. Auto-detects the type of Spark project in the current directory.
  2. Initiates a build for your project to package up dependencies.
  3. Copies your entry point and resulting build files to Amazon S3.
  4. Starts an EMR Serverless job.
  5. Waits for the job to finish, exiting with an error status if it fails.

You should now see the following output in your terminal as the job begins running in EMR Serverless:

[emr-cli]: Job submitted to EMR Serverless (Job Run ID: 00f8uf1gpdb12r0l)
[emr-cli]: Waiting for job to complete...
[emr-cli]: Job state is now: SCHEDULED
[emr-cli]: Job state is now: RUNNING
[emr-cli]: Job state is now: SUCCESS
[emr-cli]: Job completed successfully!

And that’s it! If you want to run the same code on Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2), you can replace --application-id with --cluster-id j-11111111. The CLI will take care of sending the right spark-submit commands to your EMR cluster.

Now let’s walk through some of the other commands.

emr package

PySpark projects can be packaged in numerous ways, from a single .py file to a complex Poetry project with various dependencies. The EMR CLI can help consistently package your projects without having to worry about the details.

For example, if you have a single .py file in your project directory, the package command doesn’t need to do anything. If, however, you have multiple .py files in a typical Python project style, the emr package command will zip these files up as a package that can later be uploaded to Amazon S3 and provided to your PySpark job using the --py-files option. If you have third party dependencies defined in pyproject.toml, emr package will create a virtual environment archive and start your EMR job with the spark.archive option.

The EMR CLI also supports Poetry for dependency management and packaging. If you have a Poetry project with a corresponding poetry.lock file, there’s nothing else you need to do. The emr package command will detect your poetry.lock file and automatically build the project using the Poetry Bundle plugin. You can use a Poetry project in two ways:

  • Create a project using the emr init command. The commands take a --project-type poetry option that create a Poetry project for you:
    ❯ emr init --project-type poetry emr-poetry  
    [emr-cli]: Initializing project in emr-poetry  
    [emr-cli]: Project initialized.
    ❯ cd emr-poetry
    ❯ poetry install

  • If you have a pre-existing project, you can use the emr init --dockerfile option, which creates a Dockerfile that is automatically used when you run emr package.

Finally, as noted earlier, the EMR CLI provides you a default Dockerfile based on Amazon Linux 2 that you can use to reliably build package artifacts that are compatible with different EMR environments.

emr deploy

The emr deploy command takes care of copying the necessary artifacts for your project to Amazon S3, so you don’t have to worry about it. Regardless of how the project is packaged, emr deploy will copy the resulting files to your Amazon S3 location of choice.

One use case for this is with CI/CD pipelines. Sometimes you want to deploy a specific version of code to Amazon S3 to be used in your data pipelines. With emr deploy, this is as simple as changing the --s3-code-uri parameter.

For example, let’s assume you’ve already packaged your project using the emr package command. Most CI/CD pipelines allow you to access the git tag. You can use that as part of the emr deploy command to deploy a new version of your artifacts. In GitHub actions, this is github.ref_name, and you can use this in an action to deploy a versioned artifact to Amazon S3. See the following code:

emr deploy \
    --entry-point entrypoint.py \
    --s3-code-uri s3://<BUCKET_NAME>/<PREFIX>/${{github.ref_name}}/

In your downstream jobs, you could then update the location of your entry point files to point to this new location when you’re ready, or you can use the emr run command discussed in the next section.

emr run

Let’s take a quick look at the emr run command. We’ve used it before to package, deploy, and run in one command, but you can also use it to run on already-deployed artifacts. Let’s look at the specific options:

❯ emr run --help                                                                                            
Usage: emr run [OPTIONS]                                                                                    
                                                                                                            
  Run a project on EMR, optionally build and deploy                                                         
                                                                                                            
Options:                                                                                                    
  --application-id TEXT     EMR Serverless Application ID                                                   
  --cluster-id TEXT         EMR on EC2 Cluster ID                                                           
  --entry-point FILE        Python or Jar file for the main entrypoint                                      
  --job-role TEXT           IAM Role ARN to use for the job execution                                       
  --wait                    Wait for job to finish                                                          
  --s3-code-uri TEXT        Where to copy/run code artifacts to/from                                        
  --job-name TEXT           The name of the job                                                             
  --job-args TEXT           Comma-delimited string of arguments to be passed                                
                            to Spark job                                                                    
                                                                                                            
  --spark-submit-opts TEXT  String of spark-submit options                                                  
  --build                   Package and deploy job artifacts                                                
  --show-stdout             Show the stdout of the job after it's finished                                  
  --help                    Show this message and exit.

If you want to run your code on EMR Serverless, the emr run command takes an --application-id and --job-role parameters. If you want to run on EMR on EC2, you only need the --cluster-id option.

Required for both options are --entry-point and --s3-code-uri. --entry-point is the main script that will be called by Amazon EMR. If you have any dependencies, --s3-code-uri is where they get uploaded to using the emr deploy command, and the EMR CLI will build the relevant spark-submit properties pointing to these artifacts.

There are a few different ways to customize the job:

  • –job-name – Allows you to specify the job or step name
  • –job-args – Allows you to provide command line arguments to your script
  • –spark-submit-opts – Allows you to add additional spark-submit options like --conf spark.jars or others
  • –show-stdout – Currently only works with single-file .py jobs on EMR on EC2, but will display stdout in your terminal after the job is complete

As we’ve seen before, --build invokes both the package and deploy commands. This makes it easier to iterate on local development when your code still needs to run remotely. You can simply use the same emr run command over and over again to build, deploy, and run your code in your environment of choice.

Future updates

The EMR CLI is under active development. Updates are currently in progress to support Amazon EMR on EKS and allow for the creation of local development environments to make local iteration of Spark jobs even easier. Feel free to contribute to the project in the GitHub repository.

Clean up

To avoid incurring future charges, stop or delete your EMR Serverless application. If you used the CloudFormation template, be sure to delete your stack.

Conclusion

With the release of the EMR CLI, we’ve made it easier for you to deploy and run Spark jobs on EMR Serverless. The utility is available as open source on GitHub. We’re planning a host of new functionalities; if there are specific requests you have, feel free to file an issue or open a pull request!


About the author

Damon is a Principal Developer Advocate on the EMR team at AWS. He’s worked with data and analytics pipelines for over 10 years and splits his team between splitting service logs and stacking firewood.

How SOCAR handles large IoT data with Amazon MSK and Amazon ElastiCache for Redis

Post Syndicated from Younggu Yun original https://aws.amazon.com/blogs/big-data/how-socar-handles-large-iot-data-with-amazon-msk-and-amazon-elasticache-for-redis/

This is a guest blog post co-written with SangSu Park and JaeHong Ahn from SOCAR. 

As companies continue to expand their digital footprint, the importance of real-time data processing and analysis cannot be overstated. The ability to quickly measure and draw insights from data is critical in today’s business landscape, where rapid decision-making is key. With this capability, businesses can stay ahead of the curve and develop new initiatives that drive success.

This post is a continuation of How SOCAR built a streaming data pipeline to process IoT data for real-time analytics and control. In this post, we provide a detailed overview of streaming messages with Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon ElastiCache for Redis, covering technical aspects and design considerations that are essential for achieving optimal results.

SOCAR is the leading Korean mobility company with strong competitiveness in car-sharing. SOCAR wanted to design and build a solution for a new Fleet Management System (FMS). This system involves the collection, processing, storage, and analysis of Internet of Things (IoT) streaming data from various vehicle devices, as well as historical operational data such as location, speed, fuel level, and component status.

This post demonstrates a solution for SOCAR’s production application that allows them to load streaming data from Amazon MSK into ElastiCache for Redis, optimizing the speed and efficiency of their data processing pipeline. We also discuss the key features, considerations, and design of the solution.

Background

SOCAR operates about 20,000 cars and is planning to include other large vehicle types such as commercial vehicles and courier trucks. SOCAR has deployed in-car devices that capture data using AWS IoT Core. This data was then stored in Amazon Relational Database Service (Amazon RDS). The challenge with this approach included inefficient performance and high resource usage. Therefore, SOCAR looked for purpose-built databases tailored to the needs of their application and usage patterns while meeting the future requirements of SOCAR’s business and technical requirements. The key requirements for SOCAR included achieving maximum performance for real-time data analytics, which required storing data in an in-memory data store.

After careful consideration, ElastiCache for Redis was selected as the optimal solution due to its ability to handle complex data aggregation rules with ease. One of the challenges faced was loading data from Amazon MSK into the database, because there was no built-in Kafka connector and consumer available for this task. This post focuses on the development of a Kafka consumer application that was designed to tackle this challenge by enabling performant data loading from Amazon MSK to Redis.

Solution overview

Extracting valuable insights from streaming data can be a challenge for businesses with diverse use cases and workloads. That’s why SOCAR built a solution to seamlessly bring data from Amazon MSK into multiple purpose-built databases, while also empowering users to transform data as needed. With fully managed Apache Kafka, Amazon MSK provides a reliable and efficient platform for ingesting and processing real-time data.

The following figure shows an example of the data flow at SOCAR.

solution overview

This architecture consists of three components:

  • Streaming data – Amazon MSK serves as a scalable and reliable platform for streaming data, capable of receiving and storing messages from a variety of sources, including AWS IoT Core, with messages organized into multiple topics and partitions
  • Consumer application – With a consumer application, users can seamlessly bring data from Amazon MSK into a target database or data storage while also defining data transformation rules as needed
  • Target databases – With the consumer application, the SOCAR team was able to load data from Amazon MSK into two separate databases, each serving a specific workload

Although this post focuses on a specific use case with ElastiCache for Redis as the target database and a single topic called gps, the consumer application we describe can handle additional topics and messages, as well as different streaming sources and target databases such as Amazon DynamoDB. Our post covers the most important aspects of the consumer application, including its features and components, design considerations, and a detailed guide to the code implementation.

Components of the consumer application

The consumer application comprises three main parts that work together to consume, transform, and load messages from Amazon MSK into a target database. The following diagram shows an example of data transformations in the handler component.

consumer-application

The details of each component are as follows:

  • Consumer – This consumes messages from Amazon MSK and then forwards the messages to a downstream handler.
  • Loader – This is where users specify a target database. For example, SOCAR’s target databases include ElastiCache for Redis and DynamoDB.
  • Handler – This is where users can apply data transformation rules to the incoming messages before loading them into a target database.

Features of the consumer application

This connection has three features:

  • Scalability – This solution is designed to be scalable, ensuring that the consumer application can handle an increasing volume of data and accommodate additional applications in the future. For instance, SOCAR sought to develop a solution capable of handling not only the current data from approximately 20,000 vehicles but also a larger volume of messages as the business and data continue to grow rapidly.
  • Performance – With this consumer application, users can achieve consistent performance, even as the volume of source messages and target databases increases. The application supports multithreading, allowing for concurrent data processing, and can handle unexpected spikes in data volume by easily increasing compute resources.
  • Flexibility – This consumer application can be reused for any new topics without having to build the entire consumer application again. The consumer application can be used to ingest new messages with different configuration values in the handler. SOCAR deployed multiple handlers to ingest many different messages. Also, this consumer application allows users to add additional target locations. For example, SOCAR initially developed a solution for ElastiCache for Redis and then replicated the consumer application for DynamoDB.

Design considerations of the consumer application

Note the following design considerations for the consumer application:

  • Scale out – A key design principle of this solution is scalability. To achieve this, the consumer application runs with Amazon Elastic Kubernetes Service (Amazon EKS) because it can allow users to increase and replicate consumer applications easily.
  • Consumption patterns – To receive, store, and consume data efficiently, it’s important to design Kafka topics depending on messages and consumption patterns. Depending on messages consumed at the end, messages can be received into multiple topics of different schemas. For example, SOCAR has many different topics that are consumed by different workloads.
  • Purpose-built database – The consumer application supports loading data into multiple target options based on the specific use case. For example, SOCAR stored real-time IoT data in ElastiCache for Redis to power real-time dashboard and web applications, while storing recent trip information in DynamoDB that didn’t require real-time processing.

Walkthrough overview

The producer of this solution is AWS IoT Core, which sends out messages into a topic called gps. The target database of this solution is ElastiCache for Redis. ElastiCache for Redis a fast in-memory data store that provides sub-millisecond latency to power internet-scale, real-time applications. Built on open-source Redis and compatible with the Redis APIs, ElastiCache for Redis combines the speed, simplicity, and versatility of open-source Redis with the manageability, security, and scalability from Amazon to power the most demanding real-time applications.

The target location can be either another database or storage depending on the use case and workload. SOCAR uses Amazon EKS to operate the containerized solution to achieve scalability, performance, and flexibility. Amazon EKS is a managed Kubernetes service to run Kubernetes in the AWS Cloud. Amazon EKS automatically manages the availability and scalability of the Kubernetes control plane nodes responsible for scheduling containers, managing application availability, storing cluster data, and other key tasks.

For the programming language, the SOCAR team decided to use the Go Programming language, utilizing both the AWS SDK for Go and a Goroutine, a lightweight logical or virtual thread managed by the Go runtime, which makes it easy to manage multiple threads. The AWS SDK for Go simplifies the use of AWS services by providing a set of libraries that are consistent and familiar for Go developers.

In the following sections, we walk through the steps to implement the solution:

  1. Create a consumer.
  2. Create a loader.
  3. Create a handler.
  4. Build a consumer application with the consumer, loader, and handler.
  5. Deploy the consumer application.

Prerequisites

For this walkthrough, you should have the following:

Create a consumer

In this example, we use a topic called gps, and the consumer includes a Kafka client that receives messages from the topic. SOCAR created a struct and built a consumer (called NewConsumer in the code) to make it extendable. With this approach, any additional parameters and rules can be added easily.

To authenticate with Amazon MSK, SOCAR uses IAM. Because SOCAR already uses IAM to authenticate other resources, such as Amazon EKS, it uses the same IAM role (aws_msk_iam_v2) to authenticate clients for both Amazon MSK and Apache Kafka actions.

The following code creates the consumer:

type Consumer struct {
	logger      *zerolog.Logger
	kafkaReader *kafka.Reader
}

func NewConsumer(logger *zerolog.Logger, awsCfg aws.Config, brokers []string, consumerGroupID, topic string) *Consumer {
	return &Consumer{
		logger: logger,
		kafkaReader: kafka.NewReader(kafka.ReaderConfig{
			Dialer: &kafka.Dialer{
				TLS:           &tls.Config{MinVersion: tls.VersionTLS12},
				Timeout:       10 * time.Second,
				DualStack:     true,
				SASLMechanism: aws_msk_iam_v2.NewMechanism(awsCfg),
			},
			Brokers:     brokers, //
			GroupID:     consumerGroupID, //
			Topic:       topic, //
			StartOffset: kafka.LastOffset, //
		}),
	}
}

func (consumer *Consumer) Close() error {
	var err error = nil
	if consumer.kafkaReader != nil {
		err = consumer.kafkaReader.Close()
		consumer.logger.Info().Msg("closed kafka reader")
	}
	return err
}

func (consumer *Consumer) Consume(ctx context.Context) (kafka.message, error) {
	return consumer.kafkaReader.Readmessage(ctx)
}

Create a loader

The loader function, represented by the Loader struct, is responsible for loading messages to the target location, which in this case is ElastiCache for Redis. The NewLoader function initializes a new instance of the Loader struct with a logger and a Redis cluster client, which is used to communicate with the ElastiCache cluster. The redis.NewClusterClient object is initialized using the NewRedisClient function, which uses IAM to authenticate the client for Redis actions. This ensures secure and authorized access to the ElastiCache cluster. The Loader struct also contains the Close method to close the Kafka reader and free up resources.

The following code creates a loader:

type Loader struct {
	logger      *zerolog.Logger
	redisClient *redis.ClusterClient
}

func NewLoader(logger *zerolog.Logger, redisClient *redis.ClusterClient) *Loader {
	return &Loader{
		logger:      logger,
		redisClient: redisClient,
	}
}

func (consumer *Consumer) Close() error {
	var err error = nil
	if consumer.kafkaReader != nil {
		err = consumer.kafkaReader.Close()
		consumer.logger.Info().Msg("closed kafka reader")
	}
	return err
}

func (consumer *Consumer) Consume(ctx context.Context) (kafka.Message, error) {
	return consumer.kafkaReader.ReadMessage(ctx)
}

func NewRedisClient(ctx context.Context, awsCfg aws.Config, addrs []string, replicationGroupID, username string) (*redis.ClusterClient, error) {
	redisClient := redis.NewClusterClient(&redis.ClusterOptions{
		NewClient: func(opt *redis.Options) *redis.Client {
			return redis.NewClient(&redis.Options{
				Addr: opt.Addr,
				CredentialsProvider: func() (username string, password string) {
					token, err := BuildRedisIAMAuthToken(ctx, awsCfg, replicationGroupID, opt.Username)
					if err != nil {
						panic(err)
					}
					return opt.Username, token
				},
				PoolSize:    opt.PoolSize,
				PoolTimeout: opt.PoolTimeout,
				TLSConfig:   &tls.Config{InsecureSkipVerify: true},
			})
		},
		Addrs:       addrs,
		Username:    username,
		PoolSize:    100,
		PoolTimeout: 1 * time.Minute,
	})
	pong, err := redisClient.Ping(ctx).Result()
	if err != nil {
		return nil, err
	}
	if pong != "PONG" {
		return nil, fmt.Errorf("failed to verify connection to redis server")
	}
	return redisClient, nil
}

Create a handler

A handler is used to include business rules and data transformation logic that prepares data before loading it into the target location. It acts as a bridge between a consumer and a loader. In this example, the topic name is cars.gps.json, and the message includes two keys, lng and lat, with data type Float64. The business logic can be defined in a function like handlerFuncGpsToRedis and then applied as follows:

type (
	handlerFunc    func(ctx context.Context, loader *Loader, key, value []byte) error
	handlerFuncMap map[string]handlerFunc
)

var HandlerRedis = handlerFuncMap{
	"cars.gps.json":   handlerFuncGpsToRedis
}

func GetHandlerFunc(funcMap handlerFuncMap, topic string) (handlerFunc, error) {
	handlerFunc, exist := funcMap[topic]
	if !exist {
		return nil, fmt.Errorf("failed to find handler func for '%s'", topic)
	}
	return handlerFunc, nil
}

func handlerFuncGpsToRedis(ctx context.Context, loader *Loader, key, value []byte) error {
	// unmarshal raw data to map
	data := map[string]interface{}{}
	err := json.Unmarshal(value, &data)
	if err != nil {
		return err
	}

	// prepare things to load on redis as geolocation
	name := string(key)
	lng, err := getFloat64ValueFromMap(data, "lng")
	if err != nil {
		return err
	}
	lat, err := getFloat64ValueFromMap(data, "lat")
	if err != nil {
		return err
	}

	// add geolocation to redis
	return loader.RedisGeoAdd(ctx, "cars#gps", name, lng, lat)
}

Build a consumer application with the consumer, loader, and handler

Now you have created the consumer, loader, and handler. The next step is to build a consumer application using them. In a consumer application, you read messages from your stream with a consumer, transform them using a handler, and then load transformed messages into a target location with a loader. These three components are parameterized in a consumer application function such as the one shown in the following code:

type Connector struct {
	ctx    context.Context
	logger *zerolog.Logger

	consumer *Consumer
	handler  handlerFuncMap
	loader   *Loader
}

func NewConnector(ctx context.Context, logger *zerolog.Logger, consumer *Consumer, handler handlerFuncMap, loader *Loader) *Connector {
	return &Connector{
		ctx:    ctx,
		logger: logger,

		consumer: consumer,
		handler:  handler,
		loader:   loader,
	}
}

func (connector *Connector) Close() error {
	var err error = nil
	if connector.consumer != nil {
		err = connector.consumer.Close()
	}
	if connector.loader != nil {
		err = connector.loader.Close()
	}
	return err
}

func (connector *Connector) Run() error {
	wg := sync.WaitGroup{}
	defer wg.Wait()
	handlerFunc, err := GetHandlerFunc(connector.handler, connector.consumer.kafkaReader.Config().Topic)
	if err != nil {
		return err
	}
	for {
		msg, err := connector.consumer.Consume(connector.ctx)
		if err != nil {
			if errors.Is(context.Canceled, err) {
				break
			}
		}

		wg.Add(1)
		go func(key, value []byte) {
			defer wg.Done()
			err = handlerFunc(connector.ctx, connector.loader, key, value)
			if err != nil {
				connector.logger.Err(err).Msg("")
			}
		}(msg.Key, msg.Value)
	}
	return nil
}

Deploy the consumer application

To achieve maximum parallelism, SOCAR containerizes the consumer application and deploys it into multiple pods on Amazon EKS. Each consumer application contains a unique consumer, loader, and handler. For example, if you need to receive messages from a single topic with five partitions, you can deploy five identical consumer applications, each running in its own pod. Similarly, if you have two topics with three partitions each, you should deploy two consumer applications, resulting in a total of six pods. It’s a best practice to run one consumer application per topic, and the number of pods should match the number of partitions to enable concurrent message processing. The pod number can be specified in the Kubernetes deployment configuration

There are two stages in the Dockerfile. The first stage is the builder, which installs build tools and dependencies, and builds the application. The second stage is the runner, which uses a smaller base image (Alpine) and copies only the necessary files from the builder stage. It also sets the appropriate user permissions and runs the application. It’s also worth noting that the builder stage uses a specific version of the Golang image, while the runner stage uses a specific version of the Alpine image, both of which are considered to be lightweight and secure images.

The following code is an example of the Dockerfile:

# builder
FROM golang:1.18.2-alpine3.16 AS builder
RUN apk add build-base
WORKDIR /usr/src/app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o connector .

# runner
FROM alpine:3.16.0 AS runner
WORKDIR /usr/bin/app
RUN apk add --no-cache tzdata
RUN addgroup --system app && adduser --system --shell /bin/false --ingroup app app
COPY --from=builder /usr/src/app/connector .
RUN chown -R app:app /usr/bin/app
USER app
ENTRYPOINT ["/usr/bin/app/connector"]

Conclusion

In this post, we discussed SOCAR’s approach to building a consumer application that enables IoT real-time streaming from Amazon MSK to target locations such as ElastiCache for Redis. We hope you found this post informative and useful. Thank you for reading!


About the Authors

SangSu Park is the Head of Operation Group at SOCAR. His passion is to keep learning, embrace challenges, and strive for mutual growth through communication. He loves to travel in search of new cities and places.

jaehongJaeHong Ahn is a DevOps Engineer in SOCAR’s cloud infrastructure team. He is dedicated to promoting collaboration between developers and operators. He enjoys creating DevOps tools and is committed to using his coding abilities to help build a better world. He loves to cook delicious meals as a private chef for his wife.

bdb-2857-youngguYounggu Yun works at AWS Data Lab in Korea. His role involves helping customers across the APAC region meet their business objectives and overcome technical challenges by providing prescriptive architectural guidance, sharing best practices, and building innovative solutions together.

What’s new with Amazon MWAA support for Apache Airflow version 2.4.3

Post Syndicated from Parnab Basak original https://aws.amazon.com/blogs/big-data/whats-new-with-amazon-mwaa-support-for-apache-airflow-version-2-4-3/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that makes it simple to set up and operate end-to-end data pipelines in the cloud at scale. Amazon MWAA supports multiple versions of Apache Airflow (v1.10.12, v2.0.2, and v2.2.2). Earlier in 2023, we added support for Apache Airflow v2.4.3 so you can enjoy the same scalability, availability, security, and ease of management with Airflow’s most recent improvements. Additionally, with Apache Airflow v2.4.3 support, Amazon MWAA has upgraded to Python v3.10.8, which supports newer Python libraries like OpenSSL 1.1.1 as well as major new features and improvements.

In this post, we provide an overview of the features and capabilities of Apache Airflow v2.4.3 and how you can set up or upgrade your Amazon MWAA environment to accommodate Apache Airflow v2.4.3 as you orchestrate using workflows in the cloud at scale.

New feature: Data-aware scheduling using datasets

With the release of Apache Airflow v2.4.0, Airflow introduced datasets. An Airflow dataset is a stand-in for a logical grouping of data that can trigger a Directed Acyclic Graph (DAG) in addition to regular DAG triggering mechanisms such as cron expressions, timedelta objects, and Airflow timetables. The following are some of the attributes of a dataset:

  • Datasets may be updated by upstream producer tasks, and updates to such datasets contribute to scheduling downstream consumer DAGs.
  • You can create smaller, more self-contained DAGs, which chain together into a larger data-based workflow using datasets.
  • You have an additional option now to create inter-DAG dependencies using datasets besides ExternalTaskSensor or TriggerDagRunOperator. You should consider using this dependency if you have two DAGs related via an irregular dataset update. This type of dependency also provides you with increased observability into the dependencies between your DAGs and datasets in the Airflow UI.

How data-aware scheduling works

You need to define three things:

  • A dataset, or multiple datasets
  • The tasks that will update the dataset
  • The DAG that will be scheduled when one or more datasets are updated

The following diagram illustrates the workflow.

The producer DAG has a task that creates or updates the dataset defined by a Uniform Resource Identifier (URI). Airflow schedules the consumer DAG after the dataset has been updated. A dataset will be marked as updated only if the producer task completes successfully—if the task fails or if it’s skipped, no update occurs, and the consumer DAG will not be scheduled. If your updates to a dataset triggers multiple subsequent DAGs, then you can use the Airflow metric max_active_tasks_per_dag to control the parallelism of the consumer DAG and reduce the chance of overloading the system.

Let’s demonstrate this with a code example.

Prerequisites to build a data-aware scheduled DAG

You must have the following prerequisites:

  • An Amazon Simple Storage Service (Amazon S3) bucket to upload datasets in. This can be a separate prefix in your existing S3 bucket configured for your Amazon MWAA environment, or it can be a completely different S3 bucket that you identify to store your data in.
  • An Amazon MWAA environment configured with Apache Airflow v2.4.3. The Amazon MWAA execution role should have access to read and write to the S3 bucket configured to upload datasets. The latter is only needed if it’s a different bucket than the Amazon MWAA bucket.

The following diagram illustrates the solution architecture.

The workflow steps are as follows:

  1. The producer DAG makes an API call to a publicly hosted API to retrieve data.
  2. After the data has been retrieved, it’s stored in the S3 bucket.
  3. The update to this dataset subsequently triggers the consumer DAG.

You can access the producer and consumer code in the GitHub repo.

Test the feature

To test this feature, run the producer DAG. After it’s complete, verify that a file named test.csv is generated in the specified S3 folder. Verify in the Airflow UI that the consumer DAG has been triggered by updates to the dataset and that it runs to completion.

There are two restrictions on the dataset URI:

  • It must be a valid URI, which means it must be composed of only ASCII characters
  • The URI scheme can’t be an Airflow scheme (this is reserved for future use)

Other notable changes in Apache Airflow v2.4.3:

Apache Airflow v2.4.3 has the following additional changes:

  1. Deprecation of schedule_interval and timetable arguments. Airflow v2.4.0 added a new DAG argument schedule that can accept a cron expression, timedelta object, timetable object, or list of dataset objects.
  2. Removal of experimental Smart Sensors. Smart Sensors were added in v2.0 and were deprecated in favor of deferrable operators in v2.2, and have now been removed. Deferrable operators are not yet supported on Amazon MWAA, but will be offered in a future release.
  3. Implementation of ExternalPythonOperator that can help you run some of your tasks with a different set of Python libraries than other tasks (and other than the main Airflow environment).

For detailed release documentation with sample code, visit the Apache Airflow v2.4.0 Release Notes.

New feature: Dynamic task mapping

Dynamic task mapping was a new feature introduced in Apache Airflow v2.3, which has also been extended in v2.4. Dynamic task mapping lets DAG authors create tasks dynamically based on current data. Previously, DAG authors needed to know how many tasks were needed in advance.

This is similar to defining your tasks in a loop, but instead of having the DAG file fetch the data and do that itself, the scheduler can do this based on the output of a previous task. Right before a mapped task is run, the scheduler will create n copies of the task, one for each input. The following diagram illustrates this workflow.

It’s also possible to have a task operate on the collected output of a mapped task, commonly known as map and reduce. This feature is particularly useful if you want to externally process various files, evaluate multiple machine learning models, or extraneously process a varied amount of data based on a SQL request.

How dynamic task mapping works

Let’s see an example using the reference code available in the Airflow documentation.

The following code results in a DAG with n+1 tasks, with n mapped invocations of count_lines, each called to process line counts, and a total that is the sum of each of the count_lines. Here n represents the number of input files uploaded to the S3 bucket.

With n=4 files uploaded, the resulting DAG would look like the following figure.

Prerequisites to build a dynamic task mapped DAG

You need the following prerequisites:

  • An S3 bucket to upload files in. This can be a separate prefix in your existing S3 bucket configured for your Amazon MWAA environment, or it can be a completely different bucket that you identify to store your data in.
  • An Amazon MWAA environment configured with Apache Airflow v2.4.3. The Amazon MWAA execution role should have access to read to the S3 bucket configured to upload files. The latter is only needed if it’s a different bucket than the Amazon MWAA bucket.

You can access the code in the GitHub repo.

Test the feature

Upload the four sample text files from the local data folder to an S3 bucket data folder. Run the dynamic_task_mapping DAG. When it’s complete, verify from the Airflow logs that the final sum is equal to the sum of the count lines of the individual files.

There are two limits that Airflow allows you to place on a task:

  • The number of mapped task instances that can be created as the result of expansion
  • The number of mapped tasks that can run at once

For detailed documentation with sample code, visit the Apache Airflow v2.3.0 Release Notes.

New feature: Upgraded Python version

With Apache Airflow v2.4.3 support, Amazon MWAA has upgraded to Python v3.10.8, providing support for newer Python libraries, features, and improvements. Python v3.10 has slots for data classes, match statements, clearer and better Union typing, parenthesized context managers, and structural pattern matching. Upgrading to Python v3.10 should also help you align with security standards by mitigating the risk of older versions of Python such as 3.7, which is fast approaching its end of security support.

With structural pattern matching in Python v3.10, you can now use switch-case statements instead of using if-else statements and dictionaries to simplify the code. Prior to Python v3.10, you might have used if statements, isinstance calls, exceptions and membership tests against objects, dictionaries, lists, tuples, and sets to verify that the structure of the data matches one or more patterns. The following code shows what an ad hoc pattern matching engine might have looked like prior to Python v3.10:

def http_error(status):
        if status == 200:
           return 'OK'
        elif status == 400:
            return 'Bad request'
 	    elif status == 401:
      	    return 'Not allowed'
	    elif status == 403:
      	    return 'Not allowed'
 	    elif status == 404:
      	    return 'Not allowed'
 	    else:
	        return 'Something is wrong'

With structural pattern matching in Python v3.10, the code is as follows:

def http_error(status):
    match status:
        case 200:
            return 'OK'
        case 400:
            return 'Bad request'
        case 401 | 403 | 404:
            return 'Not allowed'
        case _:
            return 'Something is wrong'

Python v3.10 also carries forward the performance improvements introduced in Python v3.9 using the vectorcall protocol. vectorcall makes many common function calls faster by minimizing or eliminating temporary objects created for the call. In Python 3.9, several Python built-ins—range, tuple, set, frozenset, list, dict—use vectorcall internally to speed up runs. The second big performance enhancer is more efficient in the parsing of Python source code using the new parser for the CPython runtime.

For a full list of Python v3.10 release highlights, refer to What’s New In Python 3.10.

The code is available in the GitHub repo.

Set up a new Apache Airflow v2.4.3 environment

You can set up a new Apache Airflow v2.4.3 environment in your account and preferred Region using either the AWS Management Console, API, or AWS Command Line Interface (AWS CLI). If you’re adopting infrastructure as code (IaC), you can automate the setup using either AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform.

When you have successfully created an Apache Airflow v2.4.3 environment in Amazon MWAA, the following packages are automatically installed on the scheduler and worker nodes along with other provider packages:

  • apache-airflow-providers-amazon==6.0.0
  • python==3.10.8

For a complete list of provider packages installed, refer to Apache Airflow provider packages installed on Amazon MWAA environments. Note that some imports and operator names have changed in the new provider package in order to standardize the naming convention across the provider package. For a complete list of provider package changes, refer to the package changelog.

Upgrade from Apache Airflow v2.0.2 or v2.2.2 to Apache Airflow v2.4.3

Currently, Amazon MWAA doesn’t support in-place upgrades of existing environments for older Apache Airflow versions. In this section, we show how you can transfer your data from your existing Apache Airflow v2.0.2 or v2.2.2 environment to Apache Airflow v2.4.3:

  1. Create a new Apache Airflow v2.4.3 environment.
  2. Copy your DAGs, custom plugins, and requirements.txt resources from your existing v2.0.2 or v2.2.2 S3 bucket to the new environment’s S3 bucket.
    • If you use requirements.txt in your environment, you need to update the --constraint to v2.4.3 constraints and verify that the current libraries and packages are compatible with Apache Airflow v2.4.3
    • With Apache Airflow v2.4.3, the list of provider packages Amazon MWAA installs by default for your environment has changed. Note that some imports and operator names have changed in the new provider package in order to standardize the naming convention across the provider package. Compare the list of provider packages installed by default in Apache Airflow v2.2.2 or v2.0.2, and configure any additional packages you might need for your new v2.4.3 environment. It’s advised to use the aws-mwaa-local-runner utility to test out your new DAGs, requirements, plugins, and dependencies locally before deploying to Amazon MWAA.
  3. Test your DAGs using the new Apache Airflow v2.4.3 environment.
  4. After you have confirmed that your tasks completed successfully, delete the v2.0.2 or v2.2.2 environment.

Conclusion

In this post, we talked about the new features of Apache Airflow v2.4.3 and how you can get started using it in Amazon MWAA. Try out these new features like data-aware scheduling, dynamic task mapping, and other enhancements along with Python v.3.10.


About the authors

Parnab Basak is a Solutions Architect and a Serverless Specialist at AWS. He specializes in creating new solutions that are cloud native using modern software development practices like serverless, DevOps, and analytics. Parnab works closely in the analytics and integration services space helping customers adopt AWS services for their workflow orchestration needs.

How we built Network Analytics v2

Post Syndicated from Alex Forster original http://blog.cloudflare.com/building-network-analytics-v2/

How we built Network Analytics v2

How we built Network Analytics v2

Network Analytics v2 is a fundamental redesign of the backend systems that provide real-time visibility into network layer traffic patterns for Magic Transit and Spectrum customers. In this blog post, we'll dive into the technical details behind this redesign and discuss some of the more interesting aspects of the new system.

To protect Cloudflare and our customers against Distributed Denial of Service (DDoS) attacks, we operate a sophisticated in-house DDoS detection and mitigation system called dosd. It takes samples of incoming packets, analyzes them for attacks, and then deploys mitigation rules to our global network which drop any packets matching specific attack fingerprints. For example, a simple network layer mitigation rule might say “drop UDP/53 packets containing responses to DNS ANY queries”.

In order to give our Magic Transit and Spectrum customers insight into the mitigation rules that we apply to their traffic, we introduced a new reporting system called "Network Analytics" back in 2020. Network Analytics is a data pipeline that analyzes raw packet samples from the Cloudflare global network. At a high level, the analysis process involves trying to match each packet sample against the list of mitigation rules that dosd has deployed, so that it can infer whether any particular packet sample was dropped due to a mitigation rule. Aggregated time-series data about these packet samples is then rolled up into one-minute buckets and inserted into a ClickHouse database for long-term storage. The Cloudflare dashboard queries this data using our public GraphQL APIs, and displays the data to customers using interactive visualizations.

What was wrong with v1?

This original implementation of Network Analytics delivered a ton of value to customers and has served us well. However, in the years since it was launched, we have continued to significantly improve our mitigation capabilities by adding entirely new mitigation systems like Advanced TCP Protection (otherwise known as flowtrackd) and Magic Firewall. The original version of Network Analytics only reports on mitigations created by dosd, which meant we had a reporting system that was showing incomplete information.

Adapting the original version of Network Analytics to work with Magic Firewall would have been relatively straightforward. Since firewall rules are “stateless”, we can tell whether a firewall rule matches a packet sample just by looking at the packet itself. That’s the same thing we were already doing to figure out whether packets match dosd mitigation rules.

However, despite our efforts, adapting Network Analytics to work with flowtrackd turned out to be an insurmountable problem. flowtrackd is “stateful”, meaning it determines whether a packet is part of a legitimate connection by tracking information about the other packets it has seen previously. The original Network Analytics design is incompatible with stateful systems like this, since that design made an assumption that the fate of a packet can be determined simply by looking at the bytes inside it.

Rethinking our approach

Rewriting a working system is not usually a good idea, but in this case it was necessary since the fundamental assumptions made by the old design were no longer true. When starting over with Network Analytics v2, it was clear to us that the new design not only needed to fix the deficiencies of the old design, it also had to be flexible enough to grow to support future products that we haven’t even thought of yet. To meet this high bar, we needed to really understand the core principles of network observability.

In the world of on-premise networks, packets typically chain through a series of appliances that each serve their own special purposes. For example, a packet may first pass through a firewall, then through a router, and then through a load balancer, before finally reaching the intended destination. The links in this chain can be thought of as independent “network functions”, each with some well-defined inputs and outputs.

How we built Network Analytics v2

A key insight for us was that, if you squint a little, Cloudflare’s software architecture looks very similar to this. Each server receives packets and chains them through a series of independent and specialized software components that handle things like DDoS mitigation, firewalling, reverse proxying, etc.

How we built Network Analytics v2

After noticing this similarity, we decided to explore how people with traditional networks monitor them. Universally, the answer is either Netflow or sFlow.

How we built Network Analytics v2

Nearly all on-premise hardware appliances can be configured to send a stream of Netflow or sFlow samples to a centralized flow collector. Traditional network operators tend to take these samples at many different points in the network, in order to monitor each device independently. This was different from our approach, which was to take packet samples only once, as soon as they entered the network and before performing any processing on them.

Another interesting thing we noticed was that Netflow and sFlow samples contain more than just information about packet contents. They also contain lots of metadata, such as the interface that packets entered and exited on, whether they were passed or dropped, which firewall or ACL rule they hit, and more. The metadata format is also extensible, so that devices can include information in their samples which might not make sense for other samples to contain. This flexibility allows flow collectors to offer rich reporting without necessarily having to understand the functions that each device performs on a network.

The more we thought about what kind of features and flexibility we wanted in an analytics system, the more we began to appreciate the elegance of traditional network monitoring. We realized that we could take advantage of the similarities between Cloudflare’s software architecture and “network functions” by having each software component emit its own packet samples with its own context-specific metadata attached.

How we built Network Analytics v2

Even though it seemed counterintuitive for our software to emit multiple streams of packet samples this way, we realized through taking inspiration from traditional network monitoring that doing so was exactly how we could build the extensible and future-proof observability that we needed.

Design & implementation

The implementation of Network Analytics v2 could be broken down into two separate pieces of work. First, we needed to build a new data pipeline that could receive packet samples from different sources, then normalize those samples and write them to long-term storage. We called this data pipeline samplerd – the “sampler daemon”.

The samplerd pipeline is relatively small and straightforward. It implements a few different interfaces that other software can use to send it metadata-rich packet samples. It then normalizes these samples and forwards them for postprocessing and insertion into a ClickHouse database.

How we built Network Analytics v2

The other, larger piece of work was to modify existing Cloudflare systems and make them send packet samples to samplerd. The rest of this post will cover a few interesting technical challenges that we had to overcome to adapt these systems to work with samplerd.

l4drop

The first system that incoming packets enter is our xdp daemon, called xdpd. In a few words, xdpd manages the installation of multiple XDP programs: a packet sampler, l4drop and L4LB. l4drop is where many types of attacks are mitigated. Mitigations done at this level are very cheap, because they happen so early in the network stack.

Before introducing samplerd, these XDP programs were organized like this:

How we built Network Analytics v2

An incoming packet goes through a sampler that will emit a packet sample for some packets. It then enters l4drop, a set of programs that will decide the fate of a particular packet. Finally, L4LB is in charge of layer 4 load balancing.

It’s critical that the samples are emitted even for packets that get dropped further down in the pipeline, because that provides visibility into what’s dropped. That’s useful both from a customer perspective to have a more comprehensive view in dashboards but also to continuously adapt our mitigations as attacks change.

In l4drop’s original configuration, a packet sample is emitted prior to the mitigation decision. Thus, that sample can’t record the mitigation action that’s taken on that particular packet.

samplerd wants packet samples to include the mitigation outcome and other metadata that indicates why a particular mitigation decision was taken. For instance, a packet may be dropped because it matched an attack mitigation signature. Or it may pass because it matched a rate limiting rule and it was under the threshold for that rule. All of this is valuable information that needs to be shown to customers.

Given this requirement, the first idea we had was to simply move the sampler after l4drop and have l4drop just mark the packet as “to be dropped”, along with metadata for the reason why. The sampler component would then have all the necessary details to emit a sample with the final fate of the packet and its associated metadata. After emitting the sample, the sampler would drop or pass the packet.

However, this requires copying all the metadata associated with the dropping decision for every single packet, whether it will be sampled or not. The cost of this copying proved prohibitive considering that every packet entering Cloudflare goes through the xdpd programs.

So we went back to the drawing board. What we actually need to know when making a sampling decision is whether we need to copy the metadata. We only need to copy the metadata if a particular packet will be sampled. That’s why it made sense to effectively split the sampler into two parts by sandwiching the programs that make the mitigation decision together. First, we make the mitigation decision, then we go through the mitigation decision programs. These programs can then decide to copy metadata only when a packet will be sampled. They will however always mark a packet with a DROP or PASS mark. Then the sampler will check the mark for sampling and the DROP/PASS mark. Based on those marks, they’ll build a sample if necessary and drop or pass the packet.

Given how tightly the sampler is now coupled with the rest of l4drop, it’s not a standalone part of xdpd anymore and the final result looks like this:

How we built Network Analytics v2

iptables

Another of our mitigation layers is iptables. We use it for some types of mitigations that we can’t perform in l4drop, like stateful connection tracking. iptables mitigations are organized as a list of rules that an incoming packet will be evaluated against. It’s also possible for a rule to jump to another rule when some conditions are met. Some of these rules will perform rate limiting, which will only drop packets beyond a certain threshold. For instance, we might drop all packets beyond a 10 packet-per-second threshold.

Prior to the introduction of samplerd, our typical rules would match on some characteristics of the packet – say, the IP and port – and make a decision whether to immediately drop or pass the packet.

To adapt our iptables rules to samplerd, we need to make them emit annotated samples, so that we can know why a decision was taken. To this end, one idea would be to just make the rules which drop packets also emit a nflog sample with a certain probability. One of the issues with that approach has to do with rate limiting rules. A packet may match such a rule, but the packet may be under the threshold and so that packet gets passed further down the line. That doesn’t work because we also want to sample those passed packets too, since it’s important for a customer to know what was passed and dropped by the rate limiter. But since a packet that passes the rate limiter may be dropped by further rules down the line, it’ll have multiple chances to be sampled, causing oversampling of some parts of the traffic. That would introduce statistical distortions in the sampled data.

To solve this, we can once again separate these steps like we did in l4drop, and make several sets of rules. First, the sampling decision is made by the first set of rules. Then, the pass-or-drop decision is made by the second set of rules. Finally, the sample can be emitted (if necessary), and then the packet can be passed or dropped by the third set of rules.

To communicate between rules we use Linux packet markings. For instance, a mark will be placed on a packet to signal that the packet will be sampled, and another mark will signify that the packet matched a particular rule and that it needs to be dropped.

For incoming packets, the rule in charge of the random sampling decision is evaluated first. Then the mitigation rules are evaluated next, in a specific order. When one of those rules decides to drop a packet, it jumps straight to the last set of rules, which will emit a sample if necessary before dropping. If no mitigation rule matches, eventually packets fall through to the last set of rules, where they will match a generic pass rule. That rule will emit a sample if necessary and pass the packet down the stack for further processing. By organizing rules in stages this way, we won’t ever double-sample packets.

ClickHouse & GraphQL

Once the samplerd daemon has the samples from the various mitigation systems, it does some light processing and ships those samples to be stored in ClickHouse. This inserter further enriches the metadata present in the sample, for instance by identifying the account associated with a particular destination IP. It also identifies ongoing attacks and adds a unique attack ID to each sample that is part of an attack.

We designed the inserters so that we’ll never need to change the data once it has been written, so that we can sustain high levels of insertion. Part of how we achieved this was by using ClickHouse’s MergeTree table engine. However, for improved performance, we have also used a less common ClickHouse table engine, called AggregatingMergeTree. Let’s dive into this using a simplified example.

Each packet sample is stored in a table that looks like the below:

Attack ID Dest IP Dest Port Sample Interval (SI)
abcd 1.1.1.1 53 1000
abcd 1.0.0.1 53 1000

The sample interval is the number of packets that went through between two samples, as we are using ABR.

These tables are then queried through the GraphQL API, either directly or by the dashboard. This required us to build a view of all the samples for a particular attack, to identify (for example) a fixed destination IP. These attacks may span days or even weeks and so these queries could potentially be costly and slow. For instance, a naive query to know whether the attack “abcd” has a fixed destination port or IP may look like this:

SELECT if(uniq(dest_ip) == 1, any(dest_ip), NULL), if(uniq(dest_port) == 1, any(dest_port), NULL)
FROM samples
WHERE attack_id = ‘abcd’

In the above query, we ask ClickHouse for a lot more data than we should need. We only really want to know whether there is one value or multiple values, yet we ask for an estimation of the number of unique values. One way to know if all values are the same (for values that can be ordered) is to check whether the maximum value is equal to the minimum. So we could rewrite the above query as:

SELECT if(min(dest_ip) == max(dest_ip), any(dest_ip), NULL), if(min(dest_port) == max(dest_port), any(dest_port), NULL)
FROM samples
WHERE attack_id = ‘abcd’

And the good news is that storing the minimum or the maximum takes very little space, typically the size of the column itself, as opposed to keeping the state that uniq() might require. It’s also very easy to store and update as we insert. So to speed up that query, we have added a precomputed table with running minimum and maximum using the AggregatingMergeTree engine. This is the special ClickHouse table engine that can compute and store the result of an aggregate function on a particular key. In our case, we will use the attackID as the key to group on, like this:

Attack ID min(Dest IP) max(Dest IP) min(Dest Port) max(Dest Port) sum(SI)
abcd 1.0.0.1 1.1.1.1 53 53 2000

Note: this can be generalized to many aggregating functions like sum(). The constraint on the function is that it gives the same result whether it’s given the whole set all at once or whether we apply the function to the value it returned on a subset and another value from the set.

Then the query that we run can be much quicker and simpler by querying our small aggregating table. In our experience, that table is roughly 0.002% of the original data size, although admittedly all columns of the original table are not present.

And we can use that to build a SQL view that would look like this for our example:

SELECT if(min_dest_ip == max_dest_ip, min_dest_ip, NULL), if(min_dest_port == max_dest_port, min_dest_port, NULL)
FROM aggregated_samples
WHERE attack_id = ‘abcd’

Attack ID Dest IP Dest Port Σ
abcd 53 2000

Implementation detail: in practice, it is possible that a row in the aggregated table gets split on multiple partitions. In that case, we will have two rows for a particular attack ID. So in production we have to take the min or max of all the rows in the aggregating table. That’s usually only three to four rows, so it’s still much faster than going over potentially thousands of samples spanning multiple days. In practice, the query we use in production is thus closer to:

SELECT if(min(min_dest_ip) == max(max_dest_ip), min(min_dest_ip), NULL), if(min(min_dest_port) == max(max_dest_port), min(min_dest_port), NULL)
FROM aggregated_samples
WHERE attack_id = ‘abcd’

Takeaways

Rewriting Network Analytics was a bet that has paid off. Customers now have a more accurate and higher fidelity view of their network traffic. Internally, we can also now troubleshoot and fine tune our mitigation systems much more effectively. And as we develop and deploy new mitigation systems in the future, we are confident that we can adapt our reporting in order to support them.