Tag Archives: Analytics

AWS Glue Data Catalog now supports automatic compaction of Apache Iceberg tables

Post Syndicated from Sébastien Stormacq original https://aws.amazon.com/blogs/aws/aws-glue-data-catalog-now-supports-automatic-compaction-of-apache-iceberg-tables/

Today, we’re making available a new capability of AWS Glue Data Catalog to allow automatic compaction of transactional tables in the Apache Iceberg format. This allows you to keep your transactional data lake tables always performant.

Data lakes were initially designed primarily for storing vast amounts of raw, unstructured, or semi structured data at a low cost, and they were commonly associated with big data and analytics use cases. Over time, the number of possible use cases for data lakes has evolved as organizations have recognized the potential to use data lakes for more than just reporting, requiring the inclusion of transactional capabilities to ensure data consistency.

Data lakes also play a pivotal role in data quality, governance, and compliance, particularly as data lakes store increasing volumes of critical business data, which often requires updates or deletion. Data-driven organizations also need to keep their back end analytics systems in near real-time sync with customer applications. This scenario requires transactional capabilities on your data lake to support concurrent writes and reads without data integrity compromise. Finally, data lakes now serve as integration points, necessitating transactions for safe and reliable data movement between various sources.

To support transactional semantics on data lake tables, organizations adopted an open table format (OTF), such as Apache Iceberg. Adopting OTF formats comes with its own set of challenges: transforming existing data lake tables from Parquet or Avro formats to an OTF format, managing a large number of small files as each transaction generates a new file on Amazon Simple Storage Service (Amazon S3), or managing object and meta-data versioning at scale, just to name a few. Organizations are typically building and managing their own data pipelines to address these challenges, leading to additional undifferentiated work on infrastructure. You need to write code, deploy Spark clusters to run your code, scale the cluster, manage errors, and so on.

When talking with our customers, we learned that the most challenging aspect is the compaction of individual small files produced by each transactional write on tables into a few large files. Large files are faster to read and scan, making your analytics jobs and queries faster to execute. Compaction optimizes the table storage with larger-sized files. It changes the storage for the table from a large number of small files to a small number of larger files. It reduces metadata overhead, lowers network round trips to S3, and improves performance. When you use engines that charge for the compute, the performance improvement is also beneficial to the cost of usage as the queries require less compute capacity to run.

But building custom pipelines to compact and optimize Iceberg tables is time-consuming and expensive. You have to manage the planning, provision infrastructure, and schedule and monitor the compaction jobs. This is why we launch automatic compaction today.

Let’s see how it works
To show you how to enable and monitor automatic compaction on Iceberg tables, I start from the AWS Lake Formation page or the AWS Glue page of the AWS Management Console. I have an existing database with tables in the Iceberg format. I execute transactions on this table over the course of a couple of days, and the table starts to fragment into small files on the underlying S3 bucket.

List of Iceberg table on Lake Formation console

I select the table on which I want to enable compaction, and then I select Enable compaction.

View details of a table in lake formation

An IAM role is required to pass permissions to the Lake Formation service to access my AWS Glue tables, S3 buckets, and CloudWatch log streams. Either I choose to create a new IAM role, or I select an existing one. Your existing role must have lakeformation:GetDataAccess and glue:UpdateTable permissions on the table. The role also needs logs:CreateLogGroup, logs:CreateLogStream, logs:PutLogEvents, to “arn:aws:logs:*:your_account_id:log-group:/aws-lakeformation-acceleration/compaction/logs:*“. The role trusted permission service name must be set to glue.amazonaws.com.

Then, I select Turn on compaction. Et voilà! Compaction is automatic; there is nothing to manage on your side.

The service starts to measure the table’s rate of change. As Iceberg tables can have multiple partitions, the service calculates this change rate for each partition and schedules managed jobs to compact the partitions where this rate of change breaches a threshold value.

When the table accumulates a high number of changes, you will be able to view the Compaction history under the Optimization tab in the console.

Lake formation compaction history in the console

You can also monitor the whole process either by observing the number of files on your S3 bucket (use the NumberOfObjects metric) or one of the two new Lake Formation metrics: numberOfBytesCompacted or numberOfFilesCompacted.

Iceberg table compaction metrics in the cloudwatch console

In addition to the AWS console, there are six new APIs that expose this new capability:CreateTableOptimizer, BatchGetTableOptimizer , UpdateTableOptimizer, DeleteTableOptimizer, GetTableOptimizer, and ListTableOptimizerRuns. These APIs are available in the AWS SDKs and AWS Command Line Interface (AWS CLI). As usual, don’t forget to update the SDK or the CLI to their latest versions to get access to these new APIs.

Things to know
As we launched this new capability today, there are a couple of additional points I’d like to share with you:

Availability
This new capability is available starting today in all AWS Regions where AWS Glue Data Catalog is available.

The pricing metric is the data processing unit (DPU), a relative measure of processing power that consists of 4 vCPUs of compute capacity and 16 GB of memory. There is a charge per DPU/hours metered by second, with a minimum of one minute.

Now it’s time to decommission your existing compaction data pipeline and switch to this new, entirely managed capability today.

— seb

Amazon Kinesis Data Streams: celebrating a decade of real-time data innovation

Post Syndicated from Roy Wang original https://aws.amazon.com/blogs/big-data/amazon-kinesis-data-streams-celebrating-a-decade-of-real-time-data-innovation/

Data is a key strategic asset for every organization, and every company is a data business at its core. However, in many organizations, data is typically spread across a number of different systems such as software as a service (SaaS) applications, operational databases, and data warehouses. Such data silos make it difficult to get unified views of the data in an organization and act in real time to derive the most value.

Ten years ago, we launched Amazon Kinesis Data Streams, the first cloud-native serverless streaming data service, to serve as the backbone for companies, to move data across system boundaries, breaking data silos. With data streaming, you can power data lakes running on Amazon Simple Storage Service (Amazon S3), enrich customer experiences via personalization, improve operational efficiency with predictive maintenance of machinery in your factories, and achieve better insights with more accurate machine learning (ML) models. Amazon Kinesis Data Streams is a foundational data strategy pillar for tens of thousands of customers. As streams of raw data come together, they unlock capabilities to continuously transform, enrich, and query data in real time via seamless integration with stream processing engines such as Amazon Managed Service for Apache Flink.

As an example, the National Hockey League (NHL) reimagined the fan experience by streaming live NHL EDGE game data and stats to offer hockey fans valuable insights to keep fans at the edge of their seats. NHL EDGE technology in the puck and players’ sweaters (jerseys) generate thousands of data points every second for the NHL, which can be analyzed by AWS to predict likely outcomes for key events like face-offs. To process and analyze thousands of signals, the NHL built a real-time streaming data foundation with Kinesis Data Streams and Amazon Managed Service for Apache Flink to stream, prepare, and feed data into ML models, helping inform face-off predictions in seconds and expanding new ways to engage viewers.

Building on such streaming data foundations, many customers are currently thinking about how to deliver transformative new products and services with generative AI. Streaming allows companies to connect the data available within data stores to large language models (LLMs) securely and in real time. Although LLMs are capable of working with billions of parameters, in order to deliver an engaging experience that is tailored to a company’s customers, LLMs require personalization data for the company’s users and proprietary knowledge stores within the company’s data stores. A data strategy that incorporates streaming is necessary to deliver personalization and proprietary data that is available for querying in real time.

Customers with real-time streaming data strategy are at the cutting edge of providing innovative products with generative AI. One customer adopted Kinesis Data Streams for their data strategy, and they stream billions of events from their digital products to derive real-time insights. With a combination of low-latency data streaming and analytics, they are able to understand and personalize the user experience via a seamlessly integrated, self-reliant system for experimentation and automated feedback. Earlier this year, building on their already strong data foundation, they launched an innovative digital media generative AI product. The same data foundation built on Kinesis Data Streams is used to continuously analyze how users interact with the generated content and helps the product team fine-tune the application.

Real-time streaming data technologies are essential for digital transformation. These services help customers bring data to their applications and models, making them smarter. Real-time data gives companies an advantage in data-driven decisions, predictions, and insights by using the data at the very moment it is generated, providing an unparalleled edge in a world where timing is the key to success. Bring the data in once, use it across your organization, and act before the value of that data diminishes.”

– Mindy Ferguson, VP of AWS Streaming and Messaging.

As we celebrate the tenth anniversary of Kinesis Data Streams, customers have shared four key reasons they continue to value this revolutionary service. They love how they can easily stream data with no underlying servers to provision or manage, operate at a massive scale with consistent performance, achieve high resiliency and durability, and benefit from broad integration with myriad sources and sinks to ingest and process data respectively.

Ease of use

Getting started with Kinesis Data Streams is straightforward: developers can create a data stream with a few clicks on the Kinesis Data Streams console or with a single API call. Changing the size or configuration is also a single API call, and each data stream comes with a default 24-hour data retention period. Developers don’t have to worry about clusters, version upgrades, or storage capacity planning. They just turn on a data stream and start ingesting data.

The needs of our customers have evolved in the past 10 years. As more events get captured and streamed, customers want their data streams to scale elastically without any operational overhead. In response, we launched On-Demand streams in 2021 to provide a simple and automatic scaling experience. With On-Demand streams, you let the service handle scaling up a stream’s capacity proactively, and you’re only charged for the actual data ingested, retrieved, and stored. As our customers continued to ask for more capabilities, we increased the ingestion throughput limit of each On-Demand stream from 200MB/s to 1GB/s in March 2023, and then to 2GB/s in October 2023, to accommodate higher throughput workloads. To continue innovating to be the easiest streaming data service to use, we actively listen to our customer use cases.

Canva is an online design and visual communication platform. As it has rapidly grown from 30 million to 135 million monthly users, it has built a streaming data platform at scale that is effortless to operate for driving product innovations and personalizing the user experience.

“Amazon Kinesis Data Streams and AWS Lambda are used throughout Canva’s logging platform, ingesting and processing over 60 billion log events per day. The combination of Kinesis Data Streams and Lambda has abstracted plenty of work that’s often required in managing a massive data pipeline, such as deploying and managing a fleet of servers, whilst also providing a highly scalable and reliable service. It has allowed us to focus on delivering a world-class product by building highly requested features rather than spending time on operational work.”

– Phoebe Zhou, Software Engineer at Canva.

Operate at massive scale with consistent performance

A fundamental requirement of a streaming data strategy is ingesting and processing large volumes of data with low latency. Kinesis Data Streams processes trillions of records per day across tens of thousands of customers. Customers run more than 3.5 million unique streams and process over 45 PB of data per day. Our largest customers ingest more than 15 GB per second of real-time data with individual streams. That’s equivalent to streaming multiple data points for every person on earth, every second! Even at this scale, all our customers still retrieve data within milliseconds of availability.

Customers also want to process the same data with multiple applications, with each deriving a different value, without worrying about one application impacting the read throughput of another. Enhanced Fan-out offers dedicated read throughput and low latency for each data consumer. This has enabled enterprise platform teams to provide real-time data to more teams and applications.

VMware Carbon Black uses Kinesis Data Streams to ingest petabytes of data every day to secure millions of customer endpoints. The team focuses on its expertise while AWS manages data streaming to meet growing customer traffic and needs in real time.

“When an individual customer’s data increases or decreases, we can use the elasticity of Amazon Kinesis Data Streams to scale compute up or down to process data reliably while effectively managing our cost. This is why Kinesis Data Streams is a good fit. The biggest advantage is the managed nature of our solution on AWS. This has shaped our architecture and helped us shift complexity elsewhere.”

– Stoyan Dimkov, Staff Engineer and Software Architect at VMware Carbon Black.

Learn more about the case study.

Provide resiliency and durability for data streaming

With burgeoning data, customers want more flexibility in processing and reprocessing data. For example, if an application that is consuming data goes offline for a period, teams want to ensure that they resume processing at a later time without data loss. Kinesis Data Streams provides a default 24-hour retention period, enabling you to select a specific timestamp from which to start processing records. With the extended retention feature, you can configure the data retention period to be up to 7 days.

Some industries like financial services and healthcare have stricter compliance requirements, so customers asked for even longer data retention periods to support these requirements. Therefore, we followed up with long-term storage that supports data retention for up to 1 year. Now, thousands of Kinesis Data Streams customers use these features to make their streaming applications more resilient and durable.

Mercado Libre, a leading ecommerce and payments platform in Latin America, relies on Kinesis Data Streams to power its streaming data strategy around payment processing, customer experience, and operations.

“With Amazon Kinesis Data Streams at the core, we process approximately 70 billion daily messages distributed across thousands of data producers. By leveraging Kinesis Data Streams and Amazon DynamoDB Streams, we’ve embraced an event-driven architecture and are able to swiftly respond to data changes.”

– Joaquin Fernandez, Senior Software Expert at Mercado Libre.

Access your data no matter where it lives

Our customers use a wide variety of tools and applications, and an organization’s data often resides in many places. Therefore, the ability to easily integrate data across an organization is crucial to derive timely insights. Developers use the Kinesis Producer Library, Kinesis Client Library, and AWS SDK to quickly build custom data producer and data consumer applications. Customers have expanded their data producers ranging from microservices to smart TVs and even cars. We have over 40 integrations with AWS services and third-party applications like Adobe Experience Platform and Databricks. As detailed in our whitepaper on building a modern data streaming architecture on AWS, Kinesis Data Streams serves as the backbone to serverless and real-time use cases such as personalization, real-time insights, Internet of Things (IoT), and event-driven architecture. Our recent integration with Amazon Redshift enables you to ingest hundreds of megabytes of data from Kinesis Data Streams into data warehouses in seconds. To learn more about how to use this integration to detect fraud in near-real time, refer to Near-real-time fraud detection using Amazon Redshift Streaming Ingestion with Amazon Kinesis Data Streams and Amazon Redshift ML.

Another integration launched in 2023 is with Amazon Monitron to power predictive maintenance management. You can now stream measurement data and the corresponding inference results to Kinesis Data Streams, coordinate predictive maintenance, and build an IoT data lake. For more details, refer to Generate actionable insights for predictive maintenance management with Amazon Monitron and Amazon Kinesis.

Next, let’s go back to the NHL use case where they combine IoT, data streaming, and machine learning.

The NHL Edge IQ powered by AWS is helping bring fans closer to the action with advanced analytics and new ML stats such as Face-off Probability and Opportunity Analysis.

“We use Amazon Kinesis Data Streams to process NHL EDGE data on puck and Player positions, face-off location, and the current game situation to decouple data producers from consuming applications. Amazon Managed Service for Apache Flink is used to run Flink applications and consumes data from Kinesis Data Streams to call the prediction model in Amazon SageMaker to deliver the real-time Face-off Probability metric. The probability results are also stored in Amazon S3 to continuously retrain the model in SageMaker. The success of this project led us to build the next metric, Opportunity Analysis, which delivers over 25 insights into the quality of the scoring opportunity presented by each shot on goal. Kinesis Data Streams and Amazon Managed Service for Apache Flink applications were critical to making live, in-game predictions, enabling the system to perform opportunity analysis calculations for up to 16 live NHL games simultaneously.”

– Eric Schneider, SVP, Software Engineering at National Hockey League.

Learn more about the case study.

The future of data is real time

The fusion of real-time data streaming and generative AI promises to be the cornerstone of our digitally connected world. Generative AI, empowered by a constant influx of real-time information from IoT devices, sensors, social media, and beyond, is becoming ubiquitous. From autonomous vehicles navigating dynamically changing traffic conditions to smart cities optimizing energy consumption based on real-time demand, the combination of AI and real-time data will underpin efficiency and innovation across industries. Ubiquitous, adaptive, and deeply integrated into our lives, these AI-driven applications will enhance convenience and address critical challenges such as climate change, healthcare, and disaster response by using the wealth of real-time insights at their disposal. With Kinesis Data Streams, organizations can build a solid data foundation, positioning you to quickly adopt new technologies and unlock new opportunities sooner—which we anticipate will be enormous.

Learn more about what our customers are doing with data streaming. If you would like a quick exploration of Kinesis Data Streams concepts and use cases, check out our Amazon Kinesis Data Streams 101 playlist. To get started with building your data streams, visit the Amazon Kinesis Data Streams Developer Guide.


About the author

Roy (KDS) Wang is a Senior Product Manager with Amazon Kinesis Data Streams. He is passionate about learning from and collaborating with customers to help organizations run faster and smarter. Outside of work, Roy strives to be a good dad to his new son and builds plastic model kits.

Amazon MSK Serverless now supports Kafka clients written in all programming languages

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/amazon-msk-serverless-now-supports-kafka-clients-written-in-all-programming-languages/

Amazon MSK Serverless is a cluster type for Amazon Managed Streaming for Apache Kafka (Amazon MSK) that is the most straightforward way to run Apache Kafka clusters without having to manage compute and storage capacity. With MSK Serverless, you can run your applications without having to provision, configure, or optimize clusters, and you pay for the data volume you stream and retain. MSK Serverless fully manages partitions, including monitoring and moving them to even load across a cluster.

With today’s launch, MSK Serverless now supports writes and reads from Kafka clients written in all programming languages. Administrators can simplify and standardize access control to Kafka resources using AWS Identity and Access Management (IAM). This support for IAM in Amazon MSK is based on SASL/OUATHBEARER, an open standard for authorization and authentication.

In this post, we show how you can connect your applications to MSK Serverless with minimal code changes using the open-sourced client helper libraries and code samples for popular languages, including Java, Python, Go, JavaScript, and .NET. Using IAM authentication and authorization is the preferred choice of many customers because you can secure Kafka resources just like you do with all other AWS services. Additionally, you get all the other benefits of IAM, such as temporary role-based credentials, precisely scoped permission policies, and more. Now you can use MSK Serverless with IAM-based authentication more broadly with the support for multiple languages.

Solution overview

You can get started by using IAM principals as identities for your Apache Kafka clients and define identity policies to provide them precisely scoped access permissions. For example, you can create an IAM user and a policy that allows the user to write to a specific Kafka topic but restricts access to other resources without worrying about managing Kafka ACLs. After you provide the identity policies with the necessary permissions, you can configure client applications to use the IAM authentication with minimal code changes.

The code changes allow your clients to use SASL/OAUTHBEARER, a Kafka supported token-based access mechanism, to pass the credentials required for IAM authentication. With OAUTHBEARER support, you can build clients that can work across both Amazon MSK and other Kafka environments. In this post, we show how you can make these code changes by using the provided code libraries and examples.

With this launch, Amazon MSK provides new code libraries for the following programming languages in the AWS GitHub repo:

The following diagram shows the conceptual process flow of using SASL/OAUTHBEARER with IAM access control for non-Java clients.

The workflow contains the following steps:

  1. The client generates an OAUTHBEARER token with the help of the provided library. The token contains a signed base64 encoded transformation of your IAM identity credentials.
  2. The client sends this to Amazon MSK using the bootstrap address along with its request to access Apache Kafka resources.
  3. The MSK Serverless cluster decodes the OATHBEARER token, validates the credentials, and checks if the client is authorized to perform the requested action according to the policy attached to the IAM identity.
  4. When the token expires, the client Kafka library automatically refreshes the token by making another call to the specified token provider.

Create IAM identities and policies

IAM access control for non-Java clients is supported for MSK Serverless clusters with no additional cost. Before you start, you need to configure the IAM identities and policies that define the client’s permissions to access resources on the cluster. The following is an example authorization policy for a cluster named MyTestCluster. To understand the semantics of the action and resource elements, see Semantics of actions and resources.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:group/MyTestCluster/*"
            ]
        }
    ]
}

Configure the client

You should make code changes to your application that allow the clients to use SASL/OAUTHBEARER to pass the credentials required for IAM authentication. You also need to make sure the security group associated with your MSK Serverless cluster has an inbound rule allowing the traffic from the client applications in the associated VPCs to the TCP port number 9098.

You must use a Kafka client library that provides support for SASL with OAUTHBRARER authentication.

For this post, we use the Python programming language. We also use https://github.com/dpkp/kafka-python as our Kafka client library.

Amazon MSK provides you with a new code library per each language that generates the OAUTHBEARER token.

  1. To get started working with the Amazon MSK IAM SASL signer for Python with your Kafka client library, run the following command:
    $ pip install aws-msk-iam-sasl-signer-python

  2. Import the installed Amazon MSK IAM SASL signer library in your code:
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    import socket
    import time
    from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

  3. Next, your application code needs to define a token provider that wraps the function that generates new tokens:
    class MSKTokenProvider():
            def token(self):
                token, _ = MSKAuthTokenProvider.generate_auth_token('<your aws region>')
                return token

  4. Specify security_protocol as SASL_SSL and sasl_mechanism as oauthbearer in your Python Kafka client properties, and pass the token provider in the configuration object:
    tp = MSKTokenProvider()
    
        producer = KafkaProducer(
            bootstrap_servers='<Amazon MSK Serverless bootstrap string>',
            security_protocol='SASL_SSL',
            sasl_mechanism='OAUTHBEARER',
            sasl_oauth_token_provider=tp,
            client_id=”my.kafka.client.unique.id”,
        )

You are now finished with all the code changes. For more examples of generating auth tokens or for more troubleshooting tips, refer to the following GitHub repo.

Conclusion

MSK Serverless now supports writes and reads from Kafka clients written in all programming languages. You can run your applications without having to configure and manage the infrastructure or optimize clusters, and you pay for the data volume you stream and retain. MSK Serverless fully manages partitions, including monitoring, and ensures an even balance of partition distribution across brokers in the cluster (auto-balancing).

For further reading on Amazon MSK, visit the official product page.


About the author

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the Cloud.

Your guide to AWS Analytics at AWS re:Invent 2023

Post Syndicated from Imtiaz Sayed original https://aws.amazon.com/blogs/big-data/your-guide-to-aws-analytics-at-aws-reinvent-2023/

Join the AWS Analytics team at AWS re:Invent this year, where new ideas and exciting innovations come together.

For those in the data world, this post provides a curated guide for all analytics sessions that you can use to quickly schedule and build your itinerary. Book your spot early for the sessions you do not want to miss. You can do this through the attendee portal, and if you cannot make it in person, get a free pass to watch the live sessions online.

We are raising the bar this year on learning while having fun! Visit us at the AWS Analytics Kiosk in the AWS Village at the Expo to discover the AWS Analytics Superhero in you, participate in a playful quiz and AWS book signing events. Watch this space for additional details.

2023 AWS Analytics Superheroes

We are excited to introduce the 2023 AWS Analytics Superheroes at this year’s re:Invent conference! Are you lightning fast and ultra-adaptable like Wire Weaver? A shapeshifting guardian and protector of data like Data Lynx? Or a digitally clairvoyant master of data insights like Cloud Sight? Join us at the Analytics kiosk to learn more about which AWS Analytics Superhero you are and receive superhero SWAG!

#AWSanalytics #awsfordata #awsreinvent2023

Keynotes

KEY002| Adam Selipsky (CEO, Amazon Web Services) | Tuesday, Nov. 28 | 8:30 AM – 10:30 AM (PDT)

Join Adam Selipsky, CEO of Amazon Web Services, as he shares his perspective on cloud transformation. He highlights innovations in data, infrastructure, and artificial intelligence and machine learning that are helping AWS customers achieve their goals faster, mine untapped potential, and create a better future.

KEY003| Swami Sivasubramanian (Vice President, Data and AI at AWS) | Nov. 29 | 8:30 AM – 10:30 AM (PDT)

A powerful relationship between humans, data, and AI is unfolding right before us. Generative AI is augmenting our productivity and creativity in new ways, while also being fueled by massive amounts of enterprise data and human intelligence. Join Swami Sivasubramanian, Vice President of Data and AI at AWS, to discover how you can use your company data to build differentiated generative AI applications and accelerate productivity for employees across your organization. Also hear from customer speakers with real-world examples of how they’ve used their data to support their generative AI use cases and create new experiences for their customers.

KEY005 | Dr. Werner Vogels (Vice President and Chief Technology Officer, Amazon.com) | Nov. 30 | 8:30 AM – 10:30 AM (PDT)

Join Dr. Werner Vogels, Amazon.com’s VP and CTO, for his twelfth re:Invent appearance. In his keynote, he covers best practices for designing resilient and cost-aware architectures. He also discusses why artificial intelligence is something every builder must consider when developing systems and the impact this will have in our world.

Analytics Innovation Talk

ANT219-INT | G2 Krishnamoorthy | (Vice President of Analytics) | Data drives transformation: Data foundations with AWS analytics | Nov. 30 | 2:00 PM – 3:00 PM (PDT)

Data is the differentiator that drives your current business needs while simultaneously preparing you for the future. As your company transforms, you need a data foundation for business applications, new technical innovations, and data-driven business initiatives. Join G2 Krishnamoorthy, Vice President of AWS Analytics, to discuss strategies for embedding analytics into your applications and ideas for building a data foundation that supports your business initiatives. With new capabilities for self-service and simpler builder experiences, you can democratize data access for line-of-business users, analysts, scientists, and engineers. Hear inspiring stories from adidas, GlobalFoundries, and University of California, Irvine.

Breakout sessions

re:Invent breakout sessions are lecture-style, 1-hour long sessions delivered by AWS experts, customers, and partners.

Monday, Nov 27 Tuesday, Nov 28 Wednesday, Nov 29 Thursday, Nov 30

8:30 AM – 9:30 AM (PDT)

Wynn

ANT321 | How Rocket Companies run their data science platform on AWS

11:30 AM – 12:30 PM (PDT)

Mandalay Bay

ANT325 | Amazon Redshift: A decade of innovation in cloud data warehousing

9:00 AM – 10:00 AM (PDT)

MGM Grand

ANT319 | Building an open source data strategy on AWS

12:30 PM – 1:30 PM (PDT)

Mandalay Bay

ANT326 | Set up a zero-ETL-based analytics architecture for your organizations

10:00 AM – 11:00 AM (PDT)

Mandalay Bay

SVS307 | Scaling serverless data processing with Amazon Kinesis and Apache Kafka

12:30 PM – 1:30 PM (PDT)

Mandalay Bay

ANT202| What’s new in Amazon DataZone

9:00 AM – 10:00 AM (PDT)

Wynn

ANT204 | What’s new with Amazon EMR and Amazon Athena

2:00 PM – 3:00 PM (PDT)

Mandalay Bay

ANT203 | What’s new in Amazon Redshift

2:30 PM – 3:30 PM (PDT)

Mandalay Bay

ANT208-S | Build cloud data management across analytics, ML and generative AI applications (sponsored by Informatica)

2:30 PM – 3:30 PM (PDT)

Ceasars Forum

ANT322 | Modernize analytics by moving your data warehouse to Amazon Redshift

10:00 AM – 11:00 AM (PDT)

Ceasars Forum

ANT209 | Optimizing TCO for business-critical analytics

2:30 PM – 3:30 PM (PDT)

Ceasars Forum

ANT210 | Improve your search with vector capabilities in Amazon OpenSearch Service

3:00 PM – 4:00 PM (PDT)

MGM Grand

COM308 | Serverless data streaming: Amazon Kinesis Data Streams and AWS Lambda

4:00 PM – 5:00 PM (PDT)

Ceasars Forum

ANT329 | Best practices for analytics and generative AI on AWS

10:30 AM – 11:30 AM (PDT)

Wynn

ANT324 | Easily and securely prepare, share, and query data

3:30 PM – 4:30 PM (PDT)

Mandalay Bay

ANT220 | What’s new with AWS data integration

3:00 PM – 4:00 PM (PDT)

Wynn

ANT320 | How Electronic Arts modernized its data platform with Amazon EMR

4:00 PM – 5:00 PM (PDT)

Mandalay Bay

ANT317 | How Rivian builds real-time analytics from electric vehicles

10:30 AM – 11:30 AM (PDT)

MGM Grand

ANT303 | What’s new in AWS Lake Formation

4:00 PM – 5:00 PM (PDT)

Caesars Forum

ANT301 | What’s new in Amazon OpenSearch Service

3:00 PM – 4:00 PM (PDT)

Mandalay Bay

BSI203 | Enhance your applications with Amazon QuickSight embedded analytics

.

11:30 AM – 12:30 PM (PDT)

Ceasars Forum

ANT318 | Accelerate innovation with end-to-end serverless data architecture

.

4:30 PM – 5:30 PM (PDT)

Wynn

ANT207 | Understand your data with business context

.

1:00 PM – 2:00 PM (PDT)

Venetian

ANT201 | Accelerate innovation with real-time data

.

5:30 PM – 6:30 PM (PDT)

MGM Grand

ANT350 | Security analytics and observability with Amazon OpenSearch Service

.

1:00 PM – 2:00 PM (PDT)

Ceasars Forum

BSI101 | Generative BI in Amazon QuickSight

.
. .

2:30 PM – 3:30 PM (PDT)

Ceasars Forum

BSI205 | What’s new with Amazon QuickSight

.
. .

5:30 PM – 6:30 PM (PDT)

Mandalay Bay

ANT205 | Curate your data at scale

.
. .

5:30 PM – 6:30 PM (PDT)

Mandalay Bay

ANT323 | Smarter, faster analytics with generative AI and ML

.

Chalk talks

Chalk talks are an hour-long, highly interactive content format with a small audience. Each begins with a short lecture delivered by an AWS expert, followed by a Q&A session with the audience.

Monday, Nov 27 Tuesday, Nov 28 Wednesday, Nov 29 Thursday, Nov 30

10:00 AM – 11:00 AM (PDT)

Wynn

OPN311 | Extending OpenSearch

11:00 AM – 12:00 PM (PDT)

Wynn

ANT336-R | Governed data sharing with Amazon DataZone and AWS Lake Formation [REPEAT]

10:00 AM – 11:00 AM (PDT)

Ceasars Forum

ANT316 | Fast-track streaming ETL with AWS streaming data services

11:00 AM – 12:00 PM (PDT)

Wynn

ANT347 | Using natural language to author data integration applications

11:30 AM – 12:30 PM (PDT)

Wynn

OPN308-R | Build and operate a Zero Trust Apache Kafka cluster [REPEAT]

11:00 AM – 12:00 PM (PDT)

Mandalay Bay

ANT338-R | Migrate legacy ETL to AWS Glue [REPEAT]

10:00 AM – 11:00 AM (PDT)

Wynn

ANT339 | Optimizing Apache Spark workloads with Amazon EMR Serverless

12:30 PM – 1:30 PM (PDT)

Mandalay Bay

ANT314 | Migrating self-managed Apache Flink to fully managed on AWS

2:30 PM – 3:30 PM (PDT)

Mandalay Bay

ANT333-R | Data integration with AWS Glue and Amazon MWAA [REPEAT]

11:30 AM – 12:30 PM (PDT)

Ceasars Forum

ANT328 | Accessing open table formats for superior data lake analytics

10:30 AM – 11:30 AM (PDT)

Ceasars Forum

STG328 | Powering your data lakes and analytics with Amazon S3

2:30 PM – 3:30 PM (PDT)

Mandalay Bay

ANT341 | Reduce downtime and optimize costs through data pipeline observability

4:00 PM – 5:00 PM (PDT)

MGM Grand

ANT315 | Accelerating value from data: Migrate from batch to stream processing

12:30 PM – 1:30 PM (PDT)

MGM Grand

BSI302 | Architecting governed BI for all users with Amazon QuickSight

11:30 AM – 12:30 PM (PDT)

Ceasars Forum

ANT340 | Powering observability with AI and Amazon OpenSearch Ingestion

2:30 PM – 3:30 PM (PDT)

Ceasars Forum

ANT342-R | Scaling analytics with data lakes and data warehouses [REPEAT]

4:00 PM – 5:00 PM (PDT)

Mandalay Bay

ANT311-R | Stream data and build transactional lakes with AWS Glue streaming [REPEAT]

12:30 PM – 1:30 PM (PDT)

Wynn

ANT403 | How to migrate your Apache Kafka workloads to Amazon MSK

2:30 PM – 3:30 PM (PDT)

Ceasars Forum

ANT346-R | Understanding zero-ETL with AWS analytics [REPEAT]

3:30 PM – 4:30 PM (PDT)

MGM Grand

STG336-R | Scale data lake access control and enhance governance with Amazon S3 [REPEAT]

4:30 PM – 5:30 PM (PDT)

Wynn

ANT337 | Identify and remediate security threats with Amazon OpenSearch Service

1:00 PM – 2:00 PM (PDT)

Ceasars Forum

ANT327-R | 5 steps to successfully migrating to Amazon OpenSearch Service [REPEAT]

2:30 PM – 3:30 PM (PDT)

Wynn

ANT334 | End-to-end data and machine learning governance on AWS

3:30 PM – 4:30 PM (PDT)

Ceasars Forum

ANT344 | Self-service analytics for all

.4:30 PM – 5:30 PM (PDT)

Mandalay Bay

BSI301-R | Architectural patterns for embedded analytics using Amazon QuickSight [REPEAT]

2:00 PM – 3:00 PM (PDT)

MGM Grand

BSI401 | DevOps strategies for Amazon QuickSight business intelligence assets

4:00 PM – 5:00 PM (PDT)

Ceasars Forum

ANT343 | Securely manage data across organizational boundaries

4:00 PM – 5:00 PM (PDT)

MGM Grand

TRV303 | Build a digital concierge for travelers and guests with generative AI

.

2:30 PM – 3:30 PM (PDT)

Mandalay Bay

ANT335 | Get the most out of your data warehousing workloads

. .
.

5:30 PM – 6:30 PM (PDT)

Ceasars Forum

ANT349-R | Advanced real-time analytics and ML in your data warehouse [REPEAT]

. .

Builders’ sessions

These are 1-hour small-group sessions with up to nine attendees per table and one AWS expert. Each builders’ session begins with a short explanation or demonstration of what you’re going to build. When the demonstration is complete, bring your laptop to experiment and build with the AWS expert.

Monday, Nov 27 Tuesday, Nov 28 Wednesday, Nov 29

2:30 PM – 3:30 PM (PDT)

Mandalay Bay

ANT313-R | Use data catalogs to improve self-service analytics [REPEAT]

11:00 AM – 12:00 PM (PDT)

Mandalay Bay

OPN401 | Apache Hudi on AWS: Tuning for cost and performance

4:00 PM – 5:00 PM (PDT)

Ceasars Forum

ANT308-R | Build large-scale transactional data lakes with Apache Iceberg on AWS [REPEAT]

.

11:00 AM – 12:00 PM (PDT)

MGM Grand

BSI206 | Scale enterprise BI securely with Amazon QuickSight

.

Workshops

Workshops are 2-hour interactive sessions where you work in teams or individually to solve problems using AWS services. Each workshop starts with a short lecture, and the rest of the time is spent working the problem. Bring your laptop to build along with AWS experts.

Monday, Nov 27 Tuesday, Nov 28 Wednesday, Nov 29

2:00 PM – 4:00 PM (PDT)

Mandalay Bay

BSI201 | Build dashboards, reports, and explore generative BI in Amazon QuickSight

11:00 AM – 1:00 PM (PDT)

MGM Grand

ANT306 | Build a data foundation to power your generative AI applications

11:30 AM – 1:30 PM (PDT)

Ceasars Forum

ANT312 | Using Amazon OpenSearch Service as a vector database for gen AI apps

2:30 PM – 4:30 PM (PDT)

Wynn

ANT305-R | Log analytics made easy with Amazon OpenSearch Serverless [REPEAT]

11:30 AM – 1:30 PM (PDT)

Mandalay Bay

ANT401-R | Event detection with Amazon MSK and Amazon Managed Service for Apache Flink [REPEAT]

.
.

11:30 AM – 1:30 PM (PDT)

MGM Grand

BSI202 | Quickly build predictive dashboards using no-code ML and generative BI

.
.

2:00 PM – 4:00 PM (PDT)

Wynn

ANT310 | Share data across Regions and organizations for near-real-time insights

.
.

11:30 AM – 1:30 PM (PDT)

Ceasars Forum

ANT304-R | A pragmatic approach to data governance on AWS [REPEAT]

.
.

11:30 AM – 1:30 PM (PDT)

Venetian

AIM350 | Personalized marketing content with generative AI and Amazon Personalize

.
.

12:00 PM – 2:00 PM (PDT)

Ceasars Forum

ANT402 | Protect and securely share the right data

.
.

12:00 PM – 2:00 PM (PDT)

Wynn

ANT307 | Connect and analyze all your data with zero-ETL approaches

.

Code talks

Code talks are similar to our popular chalk talk format, but instead of focusing on an architecture solution with whiteboarding, the speakers lead an interactive discussion featuring live coding or code samples. These sessions focus on the actual code that goes into building a solution. Attendees will learn the “why” behind the solution and see it come to life, and even the errors that are bound to happen. Attendees are encouraged to ask questions and follow along.

Monday, Nov 27 Tuesday, Nov 28 Wednesday, Nov 29 Thursday, Nov 30

11:30 AM – 12:30 PM (PDT)

Wynn

ANT332-R | Customize Amazon Athena to integrate with new data sources [REPEAT]

1:00 PM – 2:00 PM (PDT)

Wynn

ANT345 | Simplify working with data across multicloud with AWS analytics

5:30 PM – 6:30 PM (PDT)

Wynn

ANT405 | Build a Flink application on Amazon Managed Service for Apache Flink

4:00 PM – 5:00 PM (PDT)

Wynn

AIM366 | Data preparation for ML at scale with Amazon SageMaker notebooks

Conclusion

We hope this post acts as your go-to resource for navigating the AWS analytics track at re:Invent 2023. For staying in the know about the most recent trends and advancements in AWS Analytics, follow our dedicated LinkedIn page. Visit the Amazon QuickSight guide to learn what’s new in Business Intelligence.


About the authors

Imtiaz (Taz) Sayed is the WW Tech Leader for Analytics at AWS. He enjoys engaging with the community on all things data and analytics. He can be reached via LinkedIn.

Navnit Shukla serves as an AWS Specialist Solution Architect with a focus on Analytics. He possesses a strong enthusiasm for assisting clients in discovering valuable insights from their data. Through his expertise, he constructs innovative solutions that empower businesses to arrive at informed, data-driven choices. Notably, Navnit Shukla is the accomplished author of the book titled “Data Wrangling on AWS.” He can be reached via LinkedIn.

Create a modern data platform using the Data Build Tool (dbt) in the AWS Cloud

Post Syndicated from Prantik Gachhayat original https://aws.amazon.com/blogs/big-data/create-a-modern-data-platform-using-the-data-build-tool-dbt-in-the-aws-cloud/

Building a data platform involves various approaches, each with its unique blend of complexities and solutions. A modern data platform entails maintaining data across multiple layers, targeting diverse platform capabilities like high performance, ease of development, cost-effectiveness, and DataOps features such as CI/CD, lineage, and unit testing. In this post, we delve into a case study for a retail use case, exploring how the Data Build Tool (dbt) was used effectively within an AWS environment to build a high-performing, efficient, and modern data platform.

dbt is an open-source command line tool that enables data analysts and engineers to transform data in their warehouses more effectively. It does this by helping teams handle the T in ETL (extract, transform, and load) processes. It allows users to write data transformation code, run it, and test the output, all within the framework it provides. dbt enables you to write SQL select statements, and then it manages turning these select statements into tables or views in Amazon Redshift.

Use case

The Enterprise Data Analytics group of a large jewelry retailer embarked on their cloud journey with AWS in 2021. As part of their cloud modernization initiative, they sought to migrate and modernize their legacy data platform. The aim was to bolster their analytical capabilities and improve data accessibility while ensuring a quick time to market and high data quality, all with low total cost of ownership (TCO) and no need for additional tools or licenses.

dbt emerged as the perfect choice for this transformation within their existing AWS environment. This popular open-source tool for data warehouse transformations won out over other ETL tools for several reasons. dbt’s SQL-based framework made it straightforward to learn and allowed the existing development team to scale up quickly. The tool also offered desirable out-of-the-box features like data lineage, documentation, and unit testing. A crucial advantage of dbt over stored procedures was the separation of code from data—unlike stored procedures, dbt doesn’t store the code in the database itself. This separation further simplifies data management and enhances the system’s overall performance.

Let’s explore the architecture and learn how to build this use case using AWS Cloud services.

Solution overview

The following architecture demonstrates the data pipeline built on dbt to manage the Redshift data warehouse ETL process.

        Figure 1 : Modern data platform using AWS Data Services and dbt

This architecture consists of the following key services and tools:

  • Amazon Redshift was utilized as the data warehouse for the data platform, storing and processing vast amounts of structured and semi-structured data
  • Amazon QuickSight served as the business intelligence (BI) tool, allowing the business team to create analytical reports and dashboards for various business insights
  • AWS Database Migration Service (AWS DMS) was employed to perform change data capture (CDC) replication from various source transactional databases
  • AWS Glue was put to work, loading files from the SFTP location to the Amazon Simple Storage Service (Amazon S3) landing bucket and subsequently to the Redshift landing schema
  • AWS Lambda functioned as a client program, calling third-party APIs and loading the data into Redshift tables
  • AWS Fargate, a serverless container management service, was used to deploy the consumer application for source queues and topics
  • Amazon Managed Workflows for Apache Airflow (Amazon MWAA) was used to orchestrate different tasks of dbt pipelines
  • dbt, an open-source tool, was employed to write SQL-based data pipelines for data stored in Amazon Redshift, facilitating complex transformations and enhancing data modeling capabilities

Let’s take a closer look at each component and how they interact in the overall architecture to transform raw data into insightful information.

Data sources

As part of this data platform, we are ingesting data from diverse and varied data sources, including:

  • Transactional databases – These are active databases that store real-time data from various applications. The data typically encompasses all transactions and operations that the business engages in.
  • Queues and topics – Queues and topics come from various integration applications that generate data in real time. They represent an instantaneous stream of information that can be used for real-time analytics and decision-making.
  • Third-party APIs – These provide analytics and survey data related to ecommerce websites. This could include details like traffic metrics, user behavior, conversion rates, customer feedback, and more.
  • Flat files – Other systems supply data in the form of flat files of different formats. These files, stored in an SFTP location, might contain records, reports, logs, or other kinds of raw data that can be further processed and analyzed.

Data ingestion

Data from various sources are grouped into two major categories: real-time ingestion and batch ingestion.

Real-time ingestion uses the following services:

  • AWS DMS AWS DMS is used to create CDC replication pipelines from OLTP (Online Transaction Processing) databases. The data is loaded into Amazon Redshift in near-real time to ensure that the most recent information is available for analysis. You can also use Amazon Aurora zero-ETL integration with Amazon Redshift to ingest data directly from OLTP databases to Amazon Redshift.
  • Fargate Fargate is used to deploy Java consumer applications that ingest data from source topics and queues in real time. This real-time data consumption can help the business make immediate and data-informed decisions. You can also use Amazon Redshift Streaming Ingestion to ingest data from streaming engines like Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK) into Amazon Redshift.

Batch ingestion uses the following services:

  • Lambda – Lambda is used as a client for calling third-party APIs and loading the resultant data into Redshift tables. This process has been scheduled to run daily, ensuring a consistent batch of fresh data for analysis.
  • AWS Glue – AWS Glue is used to load files into Amazon Redshift through the S3 data lake. You can also use features like auto-copy from Amazon S3 (feature under preview) to ingest data from Amazon S3 to Amazon Redshift. However, the focus of this post is more on data processing within Amazon Redshift, rather than on the data loading process. Data ingestion, whether real time or batch, forms the basis of any effective data analysis, enabling organizations to gather information from diverse sources and use it for insightful decision-making.

Data warehousing using Amazon Redshift

In Amazon Redshift, we’ve established three schemas, each serving as a different layer in the data architecture:

  • Landing layer – This is where all data ingested by our services initially lands. It’s raw, unprocessed data straight from the source.
  • Certified dataset (CDS) layer – This is the next stage, where data from the landing layer undergoes cleaning, normalization, and aggregation. The cleansed and processed data is stored in this certified dataset schema. It serves as a reliable, organized source for downstream data analysis.
  • User-friendly data mart (UFDM) layer – This final layer uses data from the CDS layer to create data mart tables. These are specifically tailored to support BI reports and dashboards as per the business requirements. The goal of this layer is to present the data in a way that is most useful and accessible for end-users.

This layered approach to data management allows for efficient and organized data processing, leading to more accurate and meaningful insights.

Data pipeline

dbt, an open-source tool, can be installed in the AWS environment and set up to work with Amazon MWAA. We store our code in an S3 bucket and orchestrate it using Airflow’s Directed Acyclic Graphs (DAGs). This setup facilitates our data transformation processes in Amazon Redshift after the data is ingested into the landing schema.

To maintain modularity and handle specific domains, we create individual dbt projects. The nature of the data reporting—real-time or batch—affects how we define our dbt materialization. For real-time reporting, we define materialization as a view, loading data into the landing schema using AWS DMS from database updates or from topic or queue consumers. For batch pipelines, we define materialization as a table, allowing data to be loaded from various types of sources.

In some instances, we have had to build data pipelines that extend from the source system all the way to the UFDM layer. This can be accomplished using Airflow DAGs, which we discuss further in the next section.

To wrap up, it’s worth mentioning that we deploy a dbt webpage using a Lambda function and enable a URL for this function. This webpage serves as a hub for documentation and data lineage, further bolstering the transparency and understanding of our data processes.

ETL job orchestration

In our data pipeline, we follow these steps for job orchestration:

  1. Establish a new Amazon MWAA environment. This environment serves as the central hub for orchestrating our data pipelines.
  2. Install dbt in the new Airflow environment by adding the following dependency to your requirements.txt:
    boto3>=1.17.54
    botocore>=1.20.54
    dbt-redshift>=1.3.0
    dbt-postgres>=1.3.0

  3. Develop DAGs with specific tasks that call upon dbt commands to carry out the necessary transformations. This step involves structuring our workflows in a way that captures dependencies among tasks and ensures that tasks run in the correct order. The following code shows how to define the tasks in the DAG:
    #imports..
    ...
    
    #Define the begin_exec tasks
    start = DummyOperator(
        task_id='begin_exec',
        dag=dag 
    )
    
    #Define 'verify_dbt_install' task to check if dbt was installed properly
    verify = BashOperator(
        task_id='verify_dbt_install',
        dag=dag,
        bash_command='''
            echo "checking dbt version....";             
            /usr/local/airflow/.local/bin/dbt --version;
            if [ $? -gt 0 ]; then
                pip install dbt-redshift>=1.3.0;
            else
                echo "dbt already installed";
            fi
            python --version;
            echo "listing dbt...";      
            rm -r /tmp/dbt_project_home;
            cp -R /usr/local/airflow/dags/dbt_project_home /tmp;
            ls /tmp/dbt_project_home/<your_dbt_project_name>;
        '''
    )
    
    #Define ‘landing_to_cds_task’ task to copy from landing schema to cds schema
    landing_to_cds_task = BashOperator(
        task_id='landing_to_cds_task', 
        dag = dag,
        bash_command='''        
            /usr/local/airflow/.local/bin/dbt run --project-dir /tmp/dbt_project_home/<your_dbt_project_name> --profiles-dir /tmp/dbt_project_home/ --select <model_folder_name>.*;
        '''
    )
    
    ...
    #Define data quality check task to test a package, generate docs and copy the docs to required S3 location
    data_quality_check = BashOperator(
        task_id='data_quality_check',
        dag=dag,
        bash_command='''    
       	  /usr/local/airflow/.local/bin/dbt test –-select your_package.*               
            /usr/local/airflow/.local/bin/dbt docs generate --project-dir /tmp/dbt_project_home/<your_project_name> --profiles-dir /tmp/dbt_project_home/;        
            aws s3 cp /tmp/dbt_project_home/<your_project_name>/target/ s3://<your_S3_bucket_name>/airflow_home/dags/dbt_project_home/<your_project_name>/target --recursive;
        '''
    )

  4. Create DAGs that solely focus on dbt transformation. These DAGs handle the transformation process within our data pipelines, harnessing the power of dbt to convert raw data into valuable insights.
    #This is how we define the flow 
    start >> verify >> landing_to_cds_task >> cds_to_ufdm_task >> data_quality_check >> end_exec

The following image shows how this workflow would be seen on the Airflow UI .

  1. Create DAGs with AWS Glue for ingestion. These DAGs use AWS Glue for data ingestion tasks. AWS Glue is a fully managed ETL service that makes it easy to prepare and load data for analysis. We create DAGs that orchestrate AWS Glue jobs for extracting data from various sources, transforming it, and loading it into our data warehouse.
          #Create boto3 client for Glue 
          glue_client = boto3.client('glue', region_name='us-east-1')
    
          #Define callback function to start the Glue job using boto3 client 
          def run_glue_ingestion_job():
       glue_client.start_job_run(JobName='glue_ingestion_job')  
    
    #Define the task for glue job for ingestion
       glue_job_step = PythonOperator(
           task_id=’glue_task_for_source_to_landing’, 
           python_callable=run_glue_ingestion_job
       )
    #This is how we define the flow 
    start >> verify >> glue_task_for_source_to_landing >> landing_to_cds_task >> cds_to_ufdm_task >> data_quality_check >> end_exec
    

The following image shows how this workflow would be seen on the Airflow UI.

  1. Create DAGs with Lambda for ingestion. Lambda lets us run code without provisioning or managing servers. These DAGs use Lambda functions to call third-party APIs and load data into our Redshift tables, which can be scheduled to run at certain intervals or in response to specific events.
    #Create boto3 client for Lambda 
    lambda_client = boto3.client('lambda')
    
    #Define callback function to invoke the lambda function using boto3 client 
    def run_lambda_ingestion_job():
       Lambda_client.invoke(FunctionName='<funtion_arn>')
    )  
    
    #Define the task for glue job for ingestion
    glue_job_step = PythonOperator(
       task_id=’lambda_task_for_api_to_landing’, 
       python_callable=run_lambda_ingestion_job
    )

The following image shows how this workflow would be seen on the Airflow UI.

We now have a comprehensive, well-orchestrated process that uses a variety of AWS services to handle different stages of our data pipeline, from ingestion to transformation.

Conclusion

The combination of AWS services and the dbt open-source project provides a powerful, flexible, and scalable solution for building modern data platforms. It’s a perfect blend of manageability and functionality, with its easy-to-use, SQL-based framework and features like data quality checks, configurable load types, and detailed documentation and lineage. Its principles of “code separate from data” and reusability make it a convenient and efficient tool for a wide range of users. This practical use case of building a data platform for a retail organization demonstrates the immense potential of AWS and dbt for transforming data management and analytics, paving the way for faster insights and informed business decisions.

For more information about using dbt with Amazon Redshift, see Manage data transformations with dbt in Amazon Redshift.


About the Authors

Prantik Gachhayat is an Enterprise Architect at Infosys having experience in various technology fields and business domains. He has a proven track record helping large enterprises modernize digital platforms and delivering complex transformation programs. Prantik specializes in architecting modern data and analytics platforms in AWS. Prantik loves exploring new tech trends and enjoys cooking.

Ashutosh Dubey is a Senior Partner Solutions Architect and Global Tech leader at Amazon Web Services based out of New Jersey, USA. He has extensive experience specializing in the Data and Analytics and AIML field including generative AI, contributed to the community by writing various tech contents, and has helped Fortune 500 companies in their cloud journey to AWS.

Amazon Aurora MySQL zero-ETL integration with Amazon Redshift is now generally available

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/amazon-aurora-mysql-zero-etl-integration-with-amazon-redshift-is-now-generally-available/

Data is at the center of every application, process, and business decision,” wrote Swami Sivasubramanian, VP of Database, Analytics, and Machine Learning at AWS, and I couldn’t agree more. A common pattern customers use today is to build data pipelines to move data from Amazon Aurora to Amazon Redshift. These solutions help them gain insights to grow sales, reduce costs, and optimize their businesses.

To help you focus on creating value from data instead of preparing data for analysis, we announced Amazon Aurora zero-ETL integration with Amazon Redshift at AWS re:Invent 2022 and in public preview for Amazon Aurora MySQL-Compatible Edition in June 2023.

Now generally available: Amazon Aurora MySQL zero-ETL integration with Amazon Redshift
Today, we announced the general availability of Amazon Aurora MySQL zero-ETL integration with Amazon Redshift. With this fully managed solution, you no longer need to build and maintain complex data pipelines in order to derive time-sensitive insights from your transactional data to inform critical business decisions.

This zero-ETL integration between Amazon Aurora and Amazon Redshift unlocks opportunities for you to run near real-time analytics and machine learning (ML) on petabytes of transactional data in Amazon Redshift. As this data gets written into Aurora, it will be available in Amazon Redshift within seconds.

It also enables you to run consolidated analytics from multiple Aurora MySQL database clusters in Amazon Redshift to derive holistic insights across many applications or partitions. Amazon Aurora MySQL zero-ETL integration with Amazon Redshift processes over 1 million transactions per minute (an equivalent of 17.5 million insert/update/delete row operations per minute) from multiple Aurora databases and makes them available in Amazon Redshift in less than 15 seconds (p50 latency lag).

Furthermore, you can take advantage of the analytics and built-in ML capabilities of Amazon Redshift, such as materialized views, cross-Region data sharing, and federated access to multiple data stores and data lakes.

Let’s get started
In this article, I’ll highlight some steps along with information on how you can get started easily. I will use my existing Amazon Aurora MySQL serverless database and Amazon Redshift data warehouse.

To get started, I need to navigate to Amazon RDS and select Create zero-ETL integration on the Zero-ETL integrations page.

On the Create zero-ETL integration page, I need to follow a few steps to configure the integration for my Amazon Aurora database cluster and my Amazon Redshift data warehouse.

First, I define an identifier for my integration and select Next.

On the next page, I need to select the source database by selecting Browse RDS databases.

Here, I can select my existing database as the source.

The next step asks me the target Amazon Redshift data warehouse. Here, I have the flexibility to choose the Amazon Redshift Serverless or RA3 data warehouse in my account or in different account. I select Browse Redshift data warehouses.

Then, I choose the target data warehouse.

Because Amazon Aurora needs to replicate into the data warehouse, we need to add an additional resource policy and add the Aurora database as an authorized integration source in the Amazon Redshift data warehouse.

I can solve this by manually updating in the Amazon Redshift console or let Amazon RDS fix it for me. I tick the checkbox.

On the next page, it shows me the changes that Amazon RDS will perform for us. I select Continue.

On the next page, I can configure the tags and also the encryption. By default, zero-ETL integration encrypts your data using AWS Key Management Service (AWS KMS), and I have the option to use my own key.

Then, I need to review all the configurations and select Create zero-ETL integration to create the integration.

After a few minutes, my zero-ETL integration is sucessfully created. Then, I switch to Amazon Redshift, and on the Zero-ETL integrations page, I can see that I have my recently created zero-ETL integration.

Since the integration does not yet have a target database inside Amazon Redshift, I need to create one.

Now the integration configuration is complete. On this page, I can see the integration status is active, and there is one table that has been replicated.

For testing, I create a new table in my Amazon Aurora database and insert a record into this table.

Then I switched to the Redshift query editor v2 inside Amazon Redshift. Here I can make a connection to the database that I formed as part of the integration. By running a simple query, I can see that my data is already available inside Amazon Redshift.

I found this zero-ETL integration very convenient for two reasons. First, I could unify all data from multiple database clusters together and analyze it in aggregate. Second, within seconds of the transactional data being written into Amazon Aurora MySQL, this zero-ETL integration seamlessly made the data available in Amazon Redshift.

Things to know

Availability – Amazon Aurora zero-ETL integration with Amazon Redshift is available in US East (Ohio), US East (N. Virginia), US West (Oregon), Asia Pacific (Singapore), Asia Pacific (Sydney), Asia Pacific (Tokyo), Europe (Frankfurt), Europe (Ireland), and Europe (Stockholm).

Supported Database Engines – Amazon Aurora zero-ETL Integration with Amazon Redshift currently supports MySQL-compatible editions of Amazon Aurora. Support for Amazon Aurora PostgreSQL-Compatible Edition is a work in progress.

Pricing –  Amazon Aurora zero-ETL integration with Amazon Redshift is provided at no additional cost. You pay for existing Amazon Aurora and Amazon Redshift resources used to create and process the change data created as part of a zero-ETL integration.

We’re one step closer to helping you focus more on creating value from data instead of preparing it for analysis. To learn more on how to get started, please visit the Amazon Aurora MySQL zero-ETL integration with Amazon Redshift page.

Happy integrating!
— Donnie

Deploy Amazon QuickSight dashboards to monitor AWS Glue ETL job metrics and set alarms

Post Syndicated from Michael Hamilton original https://aws.amazon.com/blogs/big-data/deploy-amazon-quicksight-dashboards-to-monitor-aws-glue-etl-job-metrics-and-set-alarms/

No matter the industry or level of maturity within AWS, our customers require better visibility into their AWS Glue usage. Better visibility can lend itself to gains in operational efficiency, informed business decisions, and further transparency into your return on investment (ROI) when using the various features available through AWS Glue.

As your company grows, you should be able to answer simple questions about your AWS Glue usage, such as the following:

  • Where am I spending the most with AWS Glue?
  • Where can I save the most by taking advantage of new AWS Glue features?
  • What does my overall usage look like using AWS Glue?

AWS offers services such as Amazon QuickSight, a serverless business intelligence (BI) service that lets you centralize this view and even ask natural language questions of your data, using Amazon QuickSight Q. QuickSight can give business leaders and their technology counterparts a common landscape for reporting important details of their usage, providing automated narratives to bridge communication gaps.

In this post, we explore how to combine AWS Glue usage information and metrics with centralized reporting and visualization using QuickSight. This can provide you with a more comprehensive view of your usage and tools to help you dive deep into your AWS Glue job run environment. You have metrics available per job run within the AWS Glue console, but they don’t cover all available AWS Glue job metrics, and the visuals aren’t as interactive compared to the QuickSight dashboard.

Although we don’t cover optimizing your jobs for costs in this post, you can refer to Monitor and optimize cost on AWS Glue for Apache Spark to learn how to fine-tune your AWS Glue jobs for performance, efficiency ,and cost-optimization.

Let’s dive in!

Solution overview

The following diagram illustrates the architecture for the given solution. At a high level, a scheduled event triggers an orchestration flow consisting of multiple data, compute, and analytics resources—the output of which culminates as a set of visuals in a BI dashboard.

solution architecture

Now let’s dig into the technical details involved in this solution.

An AWS Step Functions workflow is scheduled to run once per hour through Amazon EventBridge, which triggers an AWS Lambda function that calls the AWS Glue GetJob and GetJobRun APIs. We parse this data to check for jobs that have succeeded, stopped, or failed in the past hour, as well as any streaming jobs. The metadata is extracted from each job run, including information like runtime, start time, end time, auto scaling, number of workers, and worker type, and is written to an Amazon DynamoDB table with TTL (time to live) enabled to ensure the table doesn’t grow too large.

We move into a parallel state to check two tables that Amazon Athena writes the output of the federated queries to. Athena first checks to make sure the tables exist in Amazon Simple Storage Service (Amazon S3), where the data will be stored. If the tables don’t exist, Athena creates them. One federated query gathers AWS Glue metric data from Amazon CloudWatch metrics; the other gathers data from the DynamoDB table where Lambda writes the AWS Glue job metadata it’s collecting. Both federated queries utilize appropriate filtering in order to only scan the necessary data from each source.

There is a choice state for each branch. If there is no new data to be added to a table in Amazon S3, the state ends and waits for the other to complete. For example, there could be an AWS Glue job that is running while the step is evaluating. In this case, the metrics for the job would be inserted in the table on Amazon S3, but the metadata from DynamoDB wouldn’t arrive until the following hour after the job has succeeded, stopped, or failed.

When new metrics or metadata are found, Athena inserts this data to the metrics or metadata tables in Amazon S3, which are both partitioned by the hour. After the data is inserted, the final steps call the QuickSight CreateIngestion API, which triggers data ingestion into QuickSight SPICE to power interactive analysis. At this point, the workflow has finished running and will run again the following hour.

In the following sections, we show you how to set up the solution, explore the dashboards, and configure alarms.

The code for this solution can be found at the AWS samples GitHub repository.

Prerequisites

You should have the following prerequisites:

Deploy solution resources with the AWS CDK

To provision the resources that build the dashboard and keep it up to date, we provide steps to download and deploy the solution via the AWS CDK. The solution was developed with cost-optimization as a priority, but some resources in the stack will incur costs once deployed.

This solution generates the following resources:

  • IAM role
  • EventBridge rule
  • Step Functions state machine
  • Lambda function
  • S3 bucket
  • Two AWS Glue tables and one AWS Glue database
  • DynamoDB table
  • Athena queries invoked by Step Functions
  • QuickSight data source, dataset, analysis, and dashboard

To deploy the solution, complete the following steps:

  1. Clone the source code from AWS samples GitHub repository to the client:
    git clone https://github.com/aws-samples/glue-metrics-in-quicksight

  2. Bootstrap your AWS CDK app:
    cd glue-metrics-in-quicksight
    npm i aws-cdk-lib
    cdk bootstrap

  3. Deploy the solution with the required parameters:
    1. The first parameter is for a new S3 bucket to be created, which holds the AWS Glue metrics and metadata.
    2. The second parameter is required in order for QuickSight to assign permissions to the user who will manage the assets. Refer to Managing user access inside Amazon QuickSight to find your existing QuickSight users.
      cdk deploy --parameters BucketName=New-Unique-Bucket-Name --parameters QuicksightUsername=QuickSight-Existing-User

If your deployment fails, make sure you installed the AWS CDK library and rerun cdk deploy after installing:

npm i aws-cdk-lib

The deployment may take up to 10 minutes.

After the solution is deployed, the Step Functions state machine will evaluate once per hour if it should ingest data into QuickSight. You can run some AWS Glue jobs after the stack is deployed and check the QuickSight dashboard in the next hour or two, where the job metadata and metrics will be populated for your analysis.

Explore the dashboard

The dashboard contains two sheets: Glue Jobs and Glue Metrics.

The Glue Jobs sheet includes all of the metadata about your AWS Glue job runs, including AWS Glue for Apache Spark, AWS Glue for Ray, and AWS Glue streaming ETL. Most of the visuals also have a hierarchy that you can drill down into with QuickSight, going as low as each specific job run ID. You can use controls to filter by date, job name, and job run ID.

In the following demonstration, you will see the pivot table, which is a simple view of all our job metadata, including estimated cost per job and job run. We open up a job name and see the different job runs. There is one individual job run that we would like to inspect the metrics on, so we choose the job name and choose View metrics for job run id: <my job run id>. This will take us to the Glue Metrics sheet and automatically filter for the job run ID we want to view.

glue information sheet

The Glue Metrics sheet is built to reflect the documentation we provide in AWS Glue resource monitoring. This documentation helps explain each visual in the dashboard. You can use the Glue Metrics sheet to view aggregated metrics across all jobs, a single job, or down to the job run ID.

To populate the Glue Metrics sheet, your AWS Glue jobs must be enabled to capture metrics in CloudWatch.

glue metrics sheet

Set up alerts

Setting up alerts on measures is also straightforward to do in QuickSight. To do so, choose (right-click) one of the tracked measures on either worksheet and choose Create Alarm. This will bring you to the configuration page to set up the metric you’d like to be alerted on.

quicksight alarm

The dashboard is designed to give you the freedom to alter it and make your own visualizations with the metadata and metrics that are provided to you. If you want even more insight into cost, consider deploying the CUDOS dashboard as well!

Clean up

If you no longer need the dashboard, delete the CDK app:

cdk destroy

Conclusion

In this post, we talked about the importance of having observability of your AWS Glue jobs and provided an AWS CDK app that deploys a QuickSight dashboard for you. We hope this helps you optimize your AWS Glue environment using the insights the dashboard provides. To learn about event-based alerting for your AWS Glue for Apache Spark and Ray jobs, refer to Automate alerting and reporting for AWS Glue job resource usage.


About the authors

Michael Hamilton is a Sr Analytics Solutions Architect focusing on helping enterprise customers in the south east modernize and simplify their analytics workloads on AWS. He enjoys mountain biking and spending time with his wife and three children when not working.

Cody Penta is a Solutions Architect at Amazon Web Services and is based out of Charlotte, NC. He has a focus in security and CDK, and enjoys solving the really difficult problems in the technology world. Off the clock, he loves relaxing in the mountains, coding personal projects, and gaming.

Angus Ferguson is a Solutions Architect at AWS who is passionate about meeting customers across the world, helping them solve their technical challenges. Angus specializes in Data & Analytics with a focus on customers in the financial services industry.

GoDaddy benchmarking results in up to 24% better price-performance for their Spark workloads with AWS Graviton2 on Amazon EMR Serverless

Post Syndicated from Mukul Sharma original https://aws.amazon.com/blogs/big-data/godaddy-benchmarking-results-in-up-to-24-better-price-performance-for-their-spark-workloads-with-aws-graviton2-on-amazon-emr-serverless/

This is a guest post co-written with Mukul Sharma, Software Development Engineer, and Ozcan IIikhan, Director of Engineering from GoDaddy.

GoDaddy empowers everyday entrepreneurs by providing all the help and tools to succeed online. With more than 22 million customers worldwide, GoDaddy is the place people come to name their ideas, build a professional website, attract customers, and manage their work.

GoDaddy is a data-driven company, and getting meaningful insights from data helps us drive business decisions to delight our customers. At GoDaddy, we embarked on a journey to uncover the efficiency promises of AWS Graviton2 on Amazon EMR Serverless as part of our long-term vision for cost-effective intelligent computing.

In this post, we share the methodology and results of our benchmarking exercise comparing the cost-effectiveness of EMR Serverless on the arm64 (Graviton2) architecture against the traditional x86_64 architecture. EMR Serverless on Graviton2 demonstrated an advantage in cost-effectiveness, resulting in significant savings in total run costs. We achieved 23.85% improvement in price-performance for sample production Spark workloads—an outcome that holds tremendous potential for businesses striving to maximize their computing efficiency.

Solution overview

GoDaddy’s intelligent compute platform envisions simplification of compute operations for all personas, without limiting power users, to ensure out-of-box cost and performance optimization for data and ML workloads. As a part of this vision, GoDaddy’s Data & ML Platform team plans to use EMR Serverless as one of the compute solutions under the hood.

The following diagram shows a high-level illustration of the intelligent compute platform vision.

Benchmarking EMR Serverless for GoDaddy

EMR Serverless is a serverless option in Amazon EMR that eliminates the complexities of configuring, managing, and scaling clusters when running big data frameworks like Apache Spark and Apache Hive. With EMR Serverless, businesses can enjoy numerous benefits, including cost-effectiveness, faster provisioning, simplified developer experience, and improved resilience to Availability Zone failures.

At GoDaddy, we embarked on a comprehensive study to benchmark EMR Serverless using real production workflows at GoDaddy. The purpose of the study was to evaluate the performance and efficiency of EMR Serverless and develop a well-informed adoption plan. The results of the study have been extremely promising, showcasing the potential of EMR Serverless for our workloads.

Having achieved compelling results in favor of EMR Serverless for our workloads, our attention turned to evaluating the utilization of the Graviton2 (arm64) architecture on EMR Serverless. In this post, we focus on comparing the performance of Graviton2 (arm64) with the x86_64 architecture on EMR Serverless. By conducting this apples-to-apples comparative analysis, we aim to gain valuable insights into the benefits and considerations of using Graviton2 for our big data workloads.

By using EMR Serverless and exploring the performance of Graviton2, GoDaddy aims to optimize their big data workflows and make informed decisions regarding the most suitable architecture for their specific needs. The combination of EMR Serverless and Graviton2 presents an exciting opportunity to enhance the data processing capabilities and drive efficiency in our operations.

AWS Graviton2

The Graviton2 processors are specifically designed by AWS, utilizing powerful 64-bit Arm Neoverse cores. This custom-built architecture provides a remarkable boost in price-performance for various cloud workloads.

In terms of cost, Graviton2 offers an appealing advantage. As indicated in the following table, the pricing for Graviton2 is 20% lower compared to the x86 architecture option.

   x86_64  arm64 (Graviton2) 
per vCPU per hour $0.052624 $0.042094
per GB per hour $0.0057785 $0.004628
per storage GB per hour* $0.000111

*Ephemeral storage: 20 GB of ephemeral storage is available for all workers by default—you pay only for any additional storage that you configure per worker.

For specific pricing details and current information, refer to Amazon EMR pricing.

AWS benchmark

The AWS team performed benchmark tests on Spark workloads with Graviton2 on EMR Serverless using the TPC-DS 3 TB scale performance benchmarks. The summary of their analysis are as follows:

  • Graviton2 on EMR Serverless demonstrated an average improvement of 10% for Spark workloads in terms of runtime. This indicates that the runtime for Spark-based tasks was reduced by approximately 10% when utilizing Graviton2.
  • Although the majority of queries showcased improved performance, a small subset of queries experienced a regression of up to 7% on Graviton2. These specific queries showed a slight decrease in performance compared to the x86 architecture option.
  • In addition to the performance analysis, the AWS team considered the cost factor. Graviton2 is offered at a 20% lower cost than the x86 architecture option. Taking this cost advantage into account, the AWS benchmark set yielded an overall 27% better price-performance for workloads. This means that by using Graviton2, users can achieve a 27% improvement in performance per unit of cost compared to the x86 architecture option.

These findings highlight the significant benefits of using Graviton2 on EMR Serverless for Spark workloads, with improved performance and cost-efficiency. It showcases the potential of Graviton2 in delivering enhanced price-performance ratios, making it an attractive choice for organizations seeking to optimize their big data workloads.

GoDaddy benchmark

During our initial experimentation, we observed that arm64 on EMR Serverless consistently outperformed or performed on par with x86_64. One of the jobs showed a 7.51% increase in resource usage on arm64 compared to x86_64, but due to the lower price of arm64, it still resulted in a 13.48% cost reduction. In another instance, we achieved an impressive 43.7% reduction in run cost, attributed to both the lower price and reduced resource utilization. Overall, our initial tests indicated that arm64 on EMR Serverless delivered superior price-performance compared to x86_64. These promising findings motivated us to conduct a more comprehensive and rigorous study.

Benchmark results

To gain a deeper understanding of the value of Graviton2 on EMR Serverless, we conducted our study using real-life production workloads from GoDaddy, which are scheduled to run at a daily cadence. Without any exceptions, EMR Serverless on arm64 (Graviton2) is significantly more cost-effective compared to the same jobs run on EMR Serverless on the x86_64 architecture. In fact, we recorded an impressive 23.85% improvement in price-performance across the sample GoDaddy jobs using Graviton2.

Like the AWS benchmarks, we observed slight regressions of less than 5% in the total runtime of some jobs. However, given that these jobs will be migrated from Amazon EMR on EC2 to EMR Serverless, the overall total runtime will still be shorter due to the minimal provisioning time in EMR Serverless. Additionally, across all jobs, we observed an average speed up of 2.1% in addition to the cost savings achieved.

These benchmarking results provide compelling evidence of the value and effectiveness of Graviton2 on EMR Serverless. The combination of improved price-performance, shorter runtimes, and overall cost savings makes Graviton2 a highly attractive option for optimizing big data workloads.

Benchmarking methodology

As an extension of a larger benchmarking EMR Serverless for GoDaddy study, where we divided Spark jobs into brackets based on total runtime (quick-run, medium-run, long-run), we measured effect of architecture (arm64 vs. x86_64) on total cost and total runtime. All other parameters were kept the same to achieve an apples-to-apples comparison.

The team followed these steps:

  1. Prepare the data and environment.
  2. Choose two random production jobs from each job bracket.
  3. Make necessary changes to avoid inference with actual production outputs.
  4. Run tests to execute scripts over multiple iterations to collect accurate and consistent data points.
  5. Validate input and output datasets, partitions, and row counts to ensure identical data processing.
  6. Gather relevant metrics from the tests.
  7. Analyze results to draw insights and conclusions.

The following table shows the summary of an example Spark job.

Metric  EMR Serverless (Average) – X86_64  EMR Serverless (Average) – Graviton  X86_64 vs Graviton (% Difference) 
Total Run Cost $2.76 $1.85 32.97%

Total Runtime

(hh:mm:ss)

00:41:31 00:34:32 16.82%
EMR Release Label emr-6.9.0
Job Type Spark
Spark Version Spark 3.3.0
Hadoop Distribution Amazon 3.3.3
Hive/HCatalog Version Hive 3.1.3, HCatalog 3.1.3

Summary of results

The following table presents a comparison of job performance between EMR Serverless on arm64 (Graviton2) and EMR Serverless on x86_64. For each architecture, every job was run at least three times to obtain the accurate average cost and runtime.

 Job  Average x86_64 Cost Average arm64 Cost Average x86_64 Runtime (hh:mm:ss) Average arm64 Runtime (hh:mm:ss)  Average Cost Savings %  Average Performance Gain % 
1 $1.64 $1.25 00:08:43 00:09:01 23.89% -3.24%
2 $10.00 $8.69 00:27:55 00:28:25 13.07% -1.79%
3 $29.66 $24.15 00:50:49 00:53:17 18.56% -4.85%
4 $34.42 $25.80 01:20:02 01:24:54 25.04% -6.08%
5 $2.76 $1.85 00:41:31 00:34:32 32.97% 16.82%
6 $34.07 $24.00 00:57:58 00:51:09 29.57% 11.76%
Average  23.85% 2.10%

Note that the improvement calculations are based on higher-precision results for more accuracy.

Conclusion

Based on this study, GoDaddy observed a significant 23.85% improvement in price-performance for sample production Spark jobs utilizing the arm64 architecture compared to the x86_64 architecture. These compelling results have led us to strongly recommend internal teams to use arm64 (Graviton2) on EMR Serverless, except in cases where there are compatibility issues with third-party packages and libraries. By adopting an arm64 architecture, organizations can achieve enhanced cost-effectiveness and performance for their workloads, contributing to more efficient data processing and analytics.


About the Authors

Mukul Sharma is a Software Development Engineer on Data & Analytics (DnA) organization at GoDaddy. He is a polyglot programmer with experience in a wide array of technologies to rapidly deliver scalable solutions. He enjoys singing karaoke, playing various board games, and working on personal programming projects in his spare time.

Ozcan Ilikhan is a Director of Engineering on Data & Analytics (DnA) organization at GoDaddy. He is passionate about solving customer problems and increasing efficiency using data and ML/AI. In his spare time, he loves reading, hiking, gardening, and working on DIY projects.

Harsh Vardhan Singh Gaur is an AWS Solutions Architect, specializing in analytics. He has over 6 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Ramesh Kumar Venkatraman is a Senior Solutions Architect at AWS who is passionate about containers and databases. He works with AWS customers to design, deploy, and manage their AWS workloads and architectures. In his spare time, he loves to play with his two kids and follows cricket.

How to visualize Amazon Security Lake findings with Amazon QuickSight

Post Syndicated from Mark Keating original https://aws.amazon.com/blogs/security/how-to-visualize-amazon-security-lake-findings-with-amazon-quicksight/

In this post, we expand on the earlier blog post Ingest, transform, and deliver events published by Amazon Security Lake to Amazon OpenSearch Service, and show you how to query and visualize data from Amazon Security Lake using Amazon Athena and Amazon QuickSight. We also provide examples that you can use in your own environment to visualize your data. This post is the second in a multi-part blog series on visualizing data in QuickSight and provides an introduction to visualizing Security Lake data using QuickSight. The first post in the series is Aggregating, searching, and visualizing log data from distributed sources with Amazon Athena and Amazon QuickSight.

With the launch of Amazon Security Lake, it’s now simpler and more convenient to access security-related data in a single place. Security Lake automatically centralizes security data from cloud, on-premises, and custom sources into a purpose-built data lake stored in your account, and removes the overhead related to building and scaling your infrastructure as your data volumes increase. With Security Lake, you can get a more complete understanding of your security data across your entire organization. You can also improve the protection of your workloads, applications, and data.

Security Lake has adopted the Open Cybersecurity Schema Framework (OCSF), an open standard. With OCSF support, the service can normalize and combine security data from AWS and a broad range of enterprise security data sources. Using the native ingestion capabilities of the service to pull in AWS CloudTrail, Amazon Route 53, VPC Flow Logs, or AWS Security Hub findings, ingesting supported third-party partner findings, or ingesting your own security-related logs, Security Lake provides an environment in which you can correlate events and findings by using a broad range of tools from the AWS and APN partner community.

Many customers have already deployed and maintain a centralized logging solution using services such as Amazon OpenSearch Service or a third-party security information and event management (SIEM) tool, and often use business intelligence (BI) tools such as Amazon QuickSight to gain insights into their data. With Security Lake, you have the freedom to choose how you analyze this data. In some cases, it may be from a centralized team using OpenSearch or a SIEM tool, and in other cases it may be that you want the ability to give your teams access to QuickSight dashboards or provide specific teams access to a single data source with Amazon Athena.

Before you get started

To follow along with this post, you must have:

  • A basic understanding of Security Lake, Athena, and QuickSight
  • Security Lake already deployed and accepting data sources
  • An existing QuickSight deployment that can be used to visualize Security Lake data, or an account where you can sign up for QuickSight to create visualizations

Accessing data

Security Lake uses the concept of data subscribers when it comes to accessing your data. A subscriber consumes logs and events from Security Lake, and supports two types of access:

  • Data access — Subscribers can directly access Amazon Simple Storage Service (Amazon S3) objects and receive notifications of new objects through a subscription endpoint or by polling an Amazon Simple Queue Service (Amazon SQS) queue. This is the architecture typically used by tools such as OpenSearch Service and partner SIEM solutions.
  • Query access — Subscribers with query access can directly query AWS Lake Formation tables in your S3 bucket by using services like Athena. Although the primary query engine for Security Lake is Athena, you can also use other services that integrate with AWS Glue, such as Amazon Redshift Spectrum and Spark SQL.

In the sections that follow, we walk through how to configure cross-account sharing from Security Lake to visualize your data with QuickSight, and the associated Athena queries that are used. It’s a best practice to isolate log data from visualization workloads, and we recommend using a separate AWS account for QuickSight visualizations. A high-level overview of the architecture is shown in Figure 1.

Figure 1: Security Lake visualization architecture overview

Figure 1: Security Lake visualization architecture overview

In Figure 1, Security Lake data is being cataloged by AWS Glue in account A. This catalog is then shared to account B by using AWS Resource Access Manager. Users in account B are then able to directly query the cataloged Security Lake data using Athena, or get visualizations by accessing QuickSight dashboards that use Athena to query the data.

Configure a Security Lake subscriber

The following steps guide you through configuring a Security Lake subscriber using the delegated administrator account.

To configure a Security Lake subscriber

  1. Sign in to the AWS Management Console and navigate to the Amazon Security Lake console in the Security Lake delegated administrator account. In this post, we’ll call this Account A.
  2. Go to Subscribers and choose Create subscriber.
  3. On the Subscriber details page, enter a Subscriber name. For example, cross-account-visualization.
  4. For Log and event sources, select All log and event sources. For Data access method, select Lake Formation.
  5. Add the Account ID for the AWS account that you’ll use for visualizations. In this post, we’ll call this Account B.
  6. Add an External ID to configure secure cross-account access. For more information, see How to use an external ID when granting access to your AWS resources to a third party.
  7. Choose Create.

Security Lake creates a resource share in your visualizations account using AWS Resource Access Manager (AWS RAM). You can view the configuration of the subscriber from Security Lake by selecting the subscriber you just created from the main Subscribers page. It should look like Figure 2.

Figure 2: Subscriber configuration

Figure 2: Subscriber configuration

Note: your configuration might be slightly different, based on what you’ve named your subscriber, the AWS Region you’re using, the logs being ingested, and the external ID that you created.

Configure Athena to visualize your data

Now that the subscriber is configured, you can move on to the next stage, where you configure Athena and QuickSight to visualize your data.

Note: In the following example, queries will be against Security Hub findings, using the Security Lake table in the ap-southeast-2 Region. If necessary, change the table name in your queries to match the Security Lake Region you use in the following configuration steps.

To configure Athena

  1. Sign in to your QuickSight visualization account (Account B).
  2. Navigate to the AWS Resource Access Manager (AWS RAM) console. You’ll see a Resource share invitation under Shared with me in the menu on the left-hand side of the screen. Choose Resource shares to go to the invitation.
    Figure 3: RAM menu

    Figure 3: RAM menu

  3. On the Resource shares page, select the name of the resource share starting with LakeFormation-V3, and then choose Accept resource share. The Security Lake Glue catalog is now available to Account B to query.
  4. For cross-account access, you should create a database to link the shared tables. Navigate to Lake Formation, and then under the Data catalog menu option, select Databases, then select Create database.
  5. Enter a name, for example security_lake_visualization, and keep the defaults for all other settings. Choose Create database.
    Figure 4: Create database

    Figure 4: Create database

  6. After you’ve created the database, you need to create resource links from the shared tables into the database. Select Tables under the Data catalog menu option. Select one of the tables shared by Security Lake by selecting the table’s name. You can identify the shared tables by looking for the ones that start with amazon_security_lake_table_.
  7. From the Actions dropdown list, select Create resource link.
    Figure 5: Creating a resource link

    Figure 5: Creating a resource link

  8. Enter the name for the resource link, for example amazon_security_lake_table_ap_southeast_2_sh_findings_1_0, and then select the security_lake_visualization database created in the previous steps.
  9. Choose Create. After the links have been created, the names of the resource links will appear in italics in the list of tables.
  10. You can now select the radio button next to the resource link, select Actions, and then select View data under Table. This takes you to the Athena query editor, where you can now run queries on the shared Security Lake tables.
    Figure 6: Viewing data to query

    Figure 6: Viewing data to query

    To use Athena for queries, you must configure an S3 bucket to store query results. If this is the first time Athena is being used in your account, you’ll receive a message saying that you need to configure an S3 bucket. To do this, choose Edit settings in the information notice and follow the instructions.

  11. In the Editor configuration, select AwsDataCatalog from the Data source options. The Database should be the database you created in the previous steps, for example security_lake_visualization.
  12. After selecting the database, copy the query that follows and paste it into your Athena query editor, and then choose Run. This runs your first query to list 10 Security Hub findings:
    Figure 7: Athena data query editor

    Figure 7: Athena data query editor

    SELECT * FROM 
    "AwsDataCatalog"."security_lake_visualization"."amazon_security_lake_table_ap_southeast_2_sh_findings_1_0" limit 10;

    This queries Security Hub data in Security Lake from the Region you specified, and outputs the results in the Query results section on the page. For a list of example Security Lake specific queries, see the AWS Security Analytics Bootstrap project, where you can find example queries specific to each of the Security Lake natively ingested data sources.

  13. To build advanced dashboards, you can create views using Athena. The following is an example of a view that lists 100 findings with failed checks sorted by created_time of the findings.
    CREATE VIEW security_hub_new_failed_findings_by_created_time AS
    SELECT
    finding.title, cloud.account_uid, compliance.status, metadata.product.name
    FROM "security_lake_visualization"."amazon_security_lake_table_ap_southeast_2_sh_findings_1_0"
    WHERE compliance.status = 'FAILED'
    ORDER BY finding.created_time
    limit 100;

  14. You can now query the view to list the first 10 rows using the following query.
    SELECT * FROM 
    "security_lake_visualization"."security_hub_new_failed_findings_by_created_time" limit 10;

Create a QuickSight dataset

Now that you’ve done a sample query and created a view, you can use Athena as the data source to create a dataset in QuickSight.

To create a QuickSight dataset

  1. Sign in to your QuickSight visualization account (also known as Account B), and open the QuickSight console.
  2. If this is the first time you’re using QuickSight, you need to sign up for a QuickSight subscription.
  3. Although there are multiple ways to sign in to QuickSight, we used AWS Identity and Access Management (IAM) based access to build the dashboards. To use QuickSight with Athena and Lake Formation, you first need to authorize connections through Lake Formation.
  4. When using cross-account configuration with AWS Glue Catalog, you also need to configure permissions on tables that are shared through Lake Formation. For a detailed deep dive, see Use Amazon Athena and Amazon QuickSight in a cross-account environment. For the use case highlighted in this post, use the following steps to grant access on the cross-account tables in the Glue Catalog.
    1. In the AWS Lake Formation console, navigate to the Tables section and select the resource link for the table, for example amazon_security_lake_table_ap_southeast_2_sh_findings_1_0.
    2. Select Actions. Under Permissions, select Grant on target.
    3. For Principals, select SAML users and groups, and then add the QuickSight user’s ARN captured in step 2 of the topic Authorize connections through Lake Formation.
    4. For the LF-Tags or catalog resources section, use the default settings.
    5. For Table permissions, choose Select for both Table Permissions and Grantable Permissions.
    6. Choose Grant.
    Figure 8: Granting permissions in Lake Formation

    Figure 8: Granting permissions in Lake Formation

  5. After permissions are in place, you can create datasets. You should also verify that you’re using QuickSight in the same Region where Lake Formation is sharing the data. The simplest way to determine your Region is to check the QuickSight URL in your web browser. The Region will be at the beginning of the URL. To change the Region, select the settings icon in the top right of the QuickSight screen and select the correct Region from the list of available Regions in the drop-down menu.
  6. Select Datasets, and then select New dataset. Select Athena from the list of available data sources.
  7. Enter a Data source name, for example security_lake_visualizations, and leave the Athena workgroup as [primary]. Then select Create data source.
  8. Select the tables to build your dashboards. On the Choose your table prompt, for Catalog, select AwsDataCatalog. For Database, select the database you created in the previous steps, for example security_lake_visualization. For Table, select the table with the name starting with amazon_security_lake_table_. Choose Select.
    Figure 9: Selecting the table for a new dataset

    Figure 9: Selecting the table for a new dataset

  9. On the Finish dataset creation prompt, select Import to SPICE for quicker analytics. Choose Visualize.
  10. In the left-hand menu in QuickSight, you can choose attributes from the data set to add analytics and widgets.

After you’re familiar with how to use QuickSight to visualize data from Security Lake, you can create additional datasets and add other widgets to create dashboards that are specific to your needs.

AWS pre-built QuickSight dashboards

So far, you’ve seen how to use Athena manually to query your data and how to use QuickSight to visualize it. AWS Professional Services is excited to announce the publication of the Data Visualization framework to help customers quickly visualize their data using QuickSight. The repository contains a combination of CDK tools and scripts that can be used to create the required AWS objects and deploy basic data sources, datasets, analysis, dashboards, and the required user groups to QuickSight with respect to Security Lake. The framework includes three pre-built dashboards based on the following personas.

Persona Role description Challenges Goals
CISO/Executive Stakeholder Owns and operates, with their support staff, all security-related activities within a business; total financial and risk accountability
  • Difficult to assess organizational aggregated security risk
  • Burdened by license costs of security tooling
  • Less agility in security programs due to mounting cost and complexity
  • Reduce risk
  • Reduce cost
  • Improve metrics (MTTD/MTTR and others)
Security Data Custodian Aggregates all security-related data sources while managing cost, access, and compliance
  • Writes new custom extract, transform, and load (ETL) every time a new data source shows up; difficult to maintain
  • Manually provisions access for users to view security data
  • Constrained by cost and performance limitations in tools depending on licenses and hardware
  • Reduce overhead to integrate new data
  • Improve data governance
  • Streamline access
Security Operator/Analyst Uses security tooling to monitor, assess, and respond to security-related events. Might perform incident response (IR), threat hunting, and other activities.
  • Moves between many security tools to answer questions about data
  • Lacks substantive automated analytics; manually processing and analyzing data
  • Can’t look historically due to limitations in tools (licensing, storage, scalability)
  • Reduce total number of tools
  • Increase observability
  • Decrease time to detect and respond
  • Decrease “alert fatigue”

After deploying through the CDK, you will have three pre-built dashboards configured and available to view. Once deployed, each of these dashboards can be customized according to your requirements. The Data Lake Executive dashboard provides a high-level overview of security findings, as shown in Figure 10.

Figure 10: Example QuickSight dashboard showing an overview of findings in Security Lake

Figure 10: Example QuickSight dashboard showing an overview of findings in Security Lake

The Security Lake custodian role will have visibility of security related data sources, as shown in Figure 11.

Figure 11: Security Lake custodian dashboard

Figure 11: Security Lake custodian dashboard

And the Security Lake operator will have a view of security related events, as shown in Figure 12.

Figure 12: Security Operator dashboard

Figure 12: Security Operator dashboard

Conclusion

In this post, you learned about Security Lake, and how you can use Athena to query your data and QuickSight to gain visibility of your security findings stored within Security Lake. When using QuickSight to visualize your data, it’s important to remember that the data remains in your S3 bucket within your own environment. However, if you have other use cases or wish to use other analytics tools such as OpenSearch, Security Lake gives you the freedom to choose how you want to interact with your data.

We also introduced the Data Visualization framework that was created by AWS Professional Services. The framework uses the CDK to deploy a set of pre-built dashboards to help get you up and running quickly.

With the announcement of AWS AppFabric, we’re making it even simpler to ingest data directly into Security Lake from leading SaaS applications without building and managing custom code or point-to-point integrations, enabling quick visualization of your data from a single place, in a common format.

For additional information on using Athena to query Security Lake, have a look at the AWS Security Analytics Bootstrap project, where you can find queries specific to each of the Security Lake natively ingested data sources. If you want to learn more about how to configure and use QuickSight to visualize findings, we have hands-on QuickSight workshops to help you configure and build QuickSight dashboards for visualizing your data.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Mark Keating

Mark Keating

Mark is an AWS Security Solutions Architect based out of the U.K. who works with Global Healthcare & Life Sciences and Automotive customers to solve their security and compliance challenges and help them reduce risk. He has over 20 years of experience working with technology, within in operations, solution, and enterprise architecture roles.

Pratima Singh

Pratima Singh

Pratima is a Security Specialist Solutions Architect with Amazon Web Services based out of Sydney, Australia. She is a security enthusiast who enjoys helping customers find innovative solutions to complex business challenges. Outside of work, Pratima enjoys going on long drives and spending time with her family at the beach.

David Hoang

David Hoang

David is a Shared Delivery Team Senior Security Consultant at AWS. His background is in AWS security, with a focus on automation. David designs, builds, and implements scalable enterprise solutions with security guardrails that use AWS services.

Ajay Rawat

Ajay Rawat

Ajay is a Security Consultant in a shared delivery team at AWS. He is a technology enthusiast who enjoys working with customers to solve their technical challenges and to improve their security posture in the cloud.

Implement model versioning with Amazon Redshift ML

Post Syndicated from Rohit Bansal original https://aws.amazon.com/blogs/big-data/implement-model-versioning-with-amazon-redshift-ml/

Amazon Redshift ML allows data analysts, developers, and data scientists to train machine learning (ML) models using SQL. In previous posts, we demonstrated how you can use the automatic model training capability of Redshift ML to train classification and regression models. Redshift ML allows you to create a model using SQL and specify your algorithm, such as XGBoost. You can use Redshift ML to automate data preparation, preprocessing, and selection of your problem type (for more information, refer to Create, train, and deploy machine learning models in Amazon Redshift using SQL with Amazon Redshift ML). You can also bring a model previously trained in Amazon SageMaker into Amazon Redshift via Redshift ML for local inference. For local inference on models created in SageMaker, the ML model type must be supported by Redshift ML. However, remote inference is available for model types that are not natively available in Redshift ML.

Over time, ML models grow old, and even if nothing drastic happens, small changes accumulate. Common reasons why ML models needs to be retrained or audited include:

  • Data drift – Because your data has changed over time, the prediction accuracy of your ML models may begin to decrease compared to the accuracy exhibited during testing
  • Concept drift – The ML algorithm that was initially used may need to be changed due to different business environments and other changing needs

You may need to refresh the model on a regular basis, automate the process, and reevaluate your model’s improved accuracy. As of this writing, Amazon Redshift doesn’t support versioning of ML models. In this post, we show how you can use the bring your own model (BYOM) functionality of Redshift ML to implement versioning of Redshift ML models.

We use local inference to implement model versioning as part of operationalizing ML models. We assume that you have a good understanding of your data and the problem type that is most applicable for your use case, and have created and deployed models to production.

Solution overview

In this post, we use Redshift ML to build a regression model that predicts the number of people that may use the city of Toronto’s bike sharing service at any given hour of a day. The model accounts for various aspects, including holidays and weather conditions, and because we need to predict a numerical outcome, we used a regression model. We use data drift as a reason for retraining the model, and use model versioning as part of the solution.

After a model is validated and is being used on a regular basis for running predictions, you can create versions of the models, which requires you to retrain the model using an updated training set and possibly a different algorithm. Versioning serves two main purposes:

  • You can refer to prior versions of a model for troubleshooting or audit purposes. This enables you to ensure that your model still retains high accuracy before switching to a newer model version.
  • You can continue to run inference queries on the current version of a model during the model training process of the new version.

At the time of this writing, Redshift ML doesn’t have native versioning capabilities, but you can still achieve versioning by implementing a few simple SQL techniques by using the BYOM capability. BYOM was introduced to support pre-trained SageMaker models to run your inference queries in Amazon Redshift. In this post, we use the same BYOM technique to create a version of an existing model built using Redshift ML.

The following figure illustrates this workflow.

In the following sections, we show you how to can create a version from an existing model and then perform model retraining.

Prerequisites

As a prerequisite for implementing the example in this post, you need to set up a Redshift cluster or Amazon Redshift Serverless endpoint. For the preliminary steps to get started and set up your environment, refer to Create, train, and deploy machine learning models in Amazon Redshift using SQL with Amazon Redshift ML.

We use the regression model created in the post Build regression models with Amazon Redshift ML. We assume that it is already been deployed and use this model to create new versions and retrain the model.

Create a version from the existing model

The first step is to create a version of the existing model (which means saving developmental changes of the model) so that a history is maintained and the model is available for comparison later on.

The following code is the generic format of the CREATE MODEL command syntax; in the next step, you get the information needed to use this command to create a new version:

CREATE MODEL model_name
    FROM ('job_name' | 's3_path' )
    FUNCTION function_name ( data_type [, ...] )
    RETURNS data_type
    IAM_ROLE { default }
    [ SETTINGS (
      S3_BUCKET 'bucket', | --required
      KMS_KEY_ID 'kms_string') --optional
    ];

Next, we collect and apply the input parameters to the preceding CREATE MODEL code to the model. We need the job name and the data types of the model input and output values. We collect these by running the show model command on our existing model. Run the following command in Amazon Redshift Query Editor v2:

show model predict_rental_count;

Note the values for AutoML Job Name, Function Parameter Types, and the Target Column (trip_count) from the model output. We use these values in the CREATE MODEL command to create the version.

The following CREATE MODEL statement creates a version of the current model using the values collected from our show model command. We append the date (the example format is YYYYMMDD) to the end of the model and function names to track when this new version was created.

CREATE MODEL predict_rental_count_20230706 
FROM 'redshiftml-20230706171639810624' 
FUNCTION predict_rental_count_20230706 (int4, int4, int4, int4, int4, int4, int4, numeric, numeric, int4)
RETURNS float8 
IAM_ROLE default
SETTINGS (
S3_BUCKET '<<your S3 Bucket>>');

This command may take few minutes to complete. When it’s complete, run the following command:

show model predict_rental_count_20230706;

We can observe the following in the output:

  • AutoML Job Name is the same as the original version of the model
  • Function Name shows the new name, as expected
  • Inference Type shows Local, which designates this is BYOM with local inference

You can run inference queries using both versions of the model to validate the inference outputs.

The following screenshot shows the output of the model inference using the original version.

The following screenshot shows the output of model inference using the version copy.

As you can see, the inference outputs are the same.

You have now learned how to create a version of a previously trained Redshift ML model.

Retrain your Redshift ML model

After you create a version of an existing model, you can retrain the existing model by simply creating a new model.

You can create and train a new model using same CREATE MODEL command but using different input parameters, datasets, or problem types as applicable. For this post, we retrain the model on newer datasets. We append _new to the model name so it’s similar to the existing model for identification purposes.

In the following code, we use the CREATE MODEL command with a new dataset available in the training_data table:

CREATE MODEL predict_rental_count_new
FROM training_data
TARGET trip_count
FUNCTION predict_rental_count_new
IAM_ROLE 'arn:aws:iam::<accountid>:role/RedshiftML'
PROBLEM_TYPE regression
OBJECTIVE 'mse'
SETTINGS (s3_bucket 'redshiftml-<your-account-id>',
          s3_garbage_collect off,
          max_runtime 5000);

Run the following command to check the status of the new model:

show model predict_rental_count_new;

Replace the existing Redshift ML model with the retrained model

The last step is to replace the existing model with the retrained model. We do this by dropping the original version of the model and recreating a model using the BYOM technique.

First, check your retrained model to ensure the MSE/RMSE scores are staying stable between model training runs. To validate the models, you can run inferences by each of the model functions on your dataset and compare the results. We use the inference queries provided in Build regression models with Amazon Redshift ML.

After validation, you can replace your model.

Start by collecting the details of the predict_rental_count_new model.

Note the AutoML Job Name value, the Function Parameter Types values, and the Target Column name in the model output.

Replace the original model by dropping the original model and then creating the model with the original model and function names to make sure the existing references to the model and function names work:

drop model predict_rental_count;
CREATE MODEL predict_rental_count
FROM 'redshiftml-20230706171639810624' 
FUNCTION predict_rental_count(int4, int4, int4, int4, int4, int4, int4, numeric, numeric, int4)
RETURNS float8 
IAM_ROLE default
SETTINGS (
S3_BUCKET ’<<your S3 Bucket>>’);

The model creation should complete in a few minutes. You can check the status of the model by running the following command:

show model predict_rental_count;

When the model status is ready, the newer version predict_rental_count of your existing model is available for inference and the original version of the ML model predict_rental_count_20230706 is available for reference if needed.

Please refer to this GitHub repository for sample scripts to automate model versioning.

Conclusion

In this post, we showed how you can use the BYOM feature of Redshift ML to do model versioning. This allows you to have a history of your models so that you can compare model scores over time, respond to audit requests, and run inferences while training a new model.

For more information about building different models with Redshift ML, refer to Amazon Redshift ML.


About the Authors

Rohit Bansal is an Analytics Specialist Solutions Architect at AWS. He specializes in Amazon Redshift and works with customers to build next-generation analytics solutions using other AWS Analytics services.

Phil Bates is a Senior Analytics Specialist Solutions Architect at AWS. He has more than 25 years of experience implementing large-scale data warehouse solutions. He is passionate about helping customers through their cloud journey and using the power of ML within their data warehouse.

Use Snowflake with Amazon MWAA to orchestrate data pipelines

Post Syndicated from Payal Singh original https://aws.amazon.com/blogs/big-data/use-snowflake-with-amazon-mwaa-to-orchestrate-data-pipelines/

This blog post is co-written with James Sun from Snowflake.

Customers rely on data from different sources such as mobile applications, clickstream events from websites, historical data, and more to deduce meaningful patterns to optimize their products, services, and processes. With a data pipeline, which is a set of tasks used to automate the movement and transformation of data between different systems, you can reduce the time and effort needed to gain insights from the data. Apache Airflow and Snowflake have emerged as powerful technologies for data management and analysis.

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed workflow orchestration service for Apache Airflow that you can use to set up and operate end-to-end data pipelines in the cloud at scale. The Snowflake Data Cloud provides a single source of truth for all your data needs and allows your organizations to store, analyze, and share large amounts of data. The Apache Airflow open-source community provides over 1,000 pre-built operators (plugins that simplify connections to services) for Apache Airflow to build data pipelines.

In this post, we provide an overview of orchestrating your data pipeline using Snowflake operators in your Amazon MWAA environment. We define the steps needed to set up the integration between Amazon MWAA and Snowflake. The solution provides an end-to-end automated workflow that includes data ingestion, transformation, analytics, and consumption.

Overview of solution

The following diagram illustrates our solution architecture.

Solution Overview

The data used for transformation and analysis is based on the publicly available New York Citi Bike dataset. The data (zipped files), which includes rider demographics and trip data, is copied from the public Citi Bike Amazon Simple Storage Service (Amazon S3) bucket in your AWS account. Data is decompressed and stored in a different S3 bucket (transformed data can be stored in the same S3 bucket where data was ingested, but for simplicity, we’re using two separate S3 buckets). The transformed data is then made accessible to Snowflake for data analysis. The output of the queried data is published to Amazon Simple Notification Service (Amazon SNS) for consumption.

Amazon MWAA uses a directed acyclic graph (DAG) to run the workflows. In this post, we run three DAGs:

The following diagram illustrates this workflow.

DAG run workflow

See the GitHub repo for the DAGs and other files related to the post.

Note that in this post, we’re using a DAG to create a Snowflake connection, but you can also create the Snowflake connection using the Airflow UI or CLI.

Prerequisites

To deploy the solution, you should have a basic understanding of Snowflake and Amazon MWAA with the following prerequisites:

  • An AWS account in an AWS Region where Amazon MWAA is supported.
  • A Snowflake account with admin credentials. If you don’t have an account, sign up for a 30-day free trial. Select the Snowflake enterprise edition for the AWS Cloud platform.
  • Access to Amazon MWAA, Secrets Manager, and Amazon SNS.
  • In this post, we’re using two S3 buckets, called airflow-blog-bucket-ACCOUNT_ID and citibike-tripdata-destination-ACCOUNT_ID. Amazon S3 supports global buckets, which means that each bucket name must be unique across all AWS accounts in all the Regions within a partition. If the S3 bucket name is already taken, choose a different S3 bucket name. Create the S3 buckets in your AWS account. We upload content to the S3 bucket later in the post. Replace ACCOUNT_ID with your own AWS account ID or any other unique identifier. The bucket details are as follows:
    • airflow-blog-bucket-ACCOUNT_ID – The top-level bucket for Amazon MWAA-related files.
    • airflow-blog-bucket-ACCOUNT_ID/requirements – The bucket used for storing the requirements.txt file needed to deploy Amazon MWAA.
    • airflow-blog-bucket-ACCOUNT_ID/dags – The bucked used for storing the DAG files to run workflows in Amazon MWAA.
    • airflow-blog-bucket-ACCOUNT_ID/dags/mwaa_snowflake_queries – The bucket used for storing the Snowflake SQL queries.
    • citibike-tripdata-destination-ACCOUNT_ID – The bucket used for storing the transformed dataset.

When implementing the solution in this post, replace references to airflow-blog-bucket-ACCOUNT_ID and citibike-tripdata-destination-ACCOUNT_ID with the names of your own S3 buckets.

Set up the Amazon MWAA environment

First, you create an Amazon MWAA environment. Before deploying the environment, upload the requirements file to the airflow-blog-bucket-ACCOUNT_ID/requirements S3 bucket. The requirements file is based on Amazon MWAA version 2.6.3. If you’re testing on a different Amazon MWAA version, update the requirements file accordingly.

Complete the following steps to set up the environment:

  1. On the Amazon MWAA console, choose Create environment.
  2. Provide a name of your choice for the environment.
  3. Choose Airflow version 2.6.3.
  4. For the S3 bucket, enter the path of your bucket (s3:// airflow-blog-bucket-ACCOUNT_ID).
  5. For the DAGs folder, enter the DAGs folder path (s3:// airflow-blog-bucket-ACCOUNT_ID/dags).
  6. For the requirements file, enter the requirements file path (s3:// airflow-blog-bucket-ACCOUNT_ID/ requirements/requirements.txt).
  7. Choose Next.
  8. Under Networking, choose your existing VPC or choose Create MWAA VPC.
  9. Under Web server access, choose Public network.
  10. Under Security groups, leave Create new security group selected.
  11. For the Environment class, Encryption, and Monitoring sections, leave all values as default.
  12. In the Airflow configuration options section, choose Add custom configuration value and configure two values:
    1. Set Configuration option to secrets.backend and Custom value to airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend.
    2. Set Configuration option to secrets.backend_kwargs and Custom value to {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}.                      Configuration options for secret manager
  13. In the Permissions section, leave the default settings and choose Create a new role.
  14. Choose Next.
  15. When the Amazon MWAA environment us available, assign S3 bucket permissions to the AWS Identity and Access Management (IAM) execution role (created during the Amazon MWAA install).

MWAA execution role
This will direct you to the created execution role on the IAM console.

For testing purposes, you can choose Add permissions and add the managed AmazonS3FullAccess policy to the user instead of providing restricted access. For this post, we provide only the required access to the S3 buckets.

  1. On the drop-down menu, choose Create inline policy.
  2. For Select Service, choose S3.
  3. Under Access level, specify the following:
    1. Expand List level and select ListBucket.
    2. Expand Read level and select GetObject.
    3. Expand Write level and select PutObject.
  4. Under Resources, choose Add ARN.
  5. On the Text tab, provide the following ARNs for S3 bucket access:
    1. arn:aws:s3:::airflow-blog-bucket-ACCOUNT_ID (use your own bucket).
    2. arn:aws:s3:::citibike-tripdata-destination-ACCOUNT_ID (use your own bucket).
    3. arn:aws:s3:::tripdata (this is the public S3 bucket where the Citi Bike dataset is stored; use the ARN as specified here).
  6. Under Resources, choose Add ARN.
  7. On the Text tab, provide the following ARNs for S3 bucket access:
    1. arn:aws:s3:::airflow-blog-bucket-ACCOUNT_ID/* (make sure to include the asterisk).
    2. arn:aws:s3:::citibike-tripdata-destination-ACCOUNT_ID /*.
    3. arn:aws:s3:::tripdata/* (this is the public S3 bucket where the Citi Bike dataset is stored, use the ARN as specified here).
  8. Choose Next.
  9. For Policy name, enter S3ReadWrite.
  10. Choose Create policy.
  11. Lastly, provide Amazon MWAA with permission to access Secrets Manager secret keys.

This step provides the Amazon MWAA execution role for your Amazon MWAA environment read access to the secret key in Secrets Manager.

The execution role should have the policies MWAA-Execution-Policy*, S3ReadWrite, and SecretsManagerReadWrite attached to it.

MWAA execution role policies

When the Amazon MWAA environment is available, you can sign in to the Airflow UI from the Amazon MWAA console using link for Open Airflow UI.

Airflow UI access

Set up an SNS topic and subscription

Next, you create an SNS topic and add a subscription to the topic. Complete the following steps:

  1. On the Amazon SNS console, choose Topics from the navigation pane.
  2. Choose Create topic.
  3. For Topic type, choose Standard.
  4. For Name, enter mwaa_snowflake.
  5. Leave the rest as default.
  6. After you create the topic, navigate to the Subscriptions tab and choose Create subscription.
    SNS topic
  7. For Topic ARN, choose mwaa_snowflake.
  8. Set the protocol to Email.
  9. For Endpoint, enter your email ID (you will get a notification in your email to accept the subscription).

By default, only the topic owner can publish and subscribe to the topic, so you need to modify the Amazon MWAA execution role access policy to allow Amazon SNS access.

  1. On the IAM console, navigate to the execution role you created earlier.
  2. On the drop-down menu, choose Create inline policy.
    MWAA execution role SNS policy
  3. For Select service, choose SNS.
  4. Under Actions, expand Write access level and select Publish.
  5. Under Resources, choose Add ARN.
  6. On the Text tab, specify the ARN arn:aws:sns:<<region>>:<<our_account_ID>>:mwaa_snowflake.
  7. Choose Next.
  8. For Policy name, enter SNSPublishOnly.
  9. Choose Create policy.

Configure a Secrets Manager secret

Next, we set up Secrets Manager, which is a supported alternative database for storing Snowflake connection information and credentials.

To create the connection string, the Snowflake host and account name is required. Log in to your Snowflake account, and under the Worksheets menu, choose the plus sign and select SQL worksheet. Using the worksheet, run the following SQL commands to find the host and account name.

Run the following query for the host name:

WITH HOSTLIST AS 
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'"','') AS HOST
FROM HOSTLIST
WHERE VALUE:type = 'SNOWFLAKE_DEPLOYMENT_REGIONLESS';

Run the following query for the account name:

WITH HOSTLIST AS 
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'.snowflakecomputing.com','') AS ACCOUNT
FROM HOSTLIST
WHERE VALUE:type = 'SNOWFLAKE_DEPLOYMENT';

Next, we configure the secret in Secrets Manager.

  1. On the Secrets Manager console, choose Store a new secret.
  2. For Secret type, choose Other type of secret.
  3. Under Key/Value pairs, choose the Plaintext tab.
  4. In the text field, enter the following code and modify the string to reflect your Snowflake account information:

{"host": "<<snowflake_host_name>>", "account":"<<snowflake_account_name>>","user":"<<snowflake_username>>","password":"<<snowflake_password>>","schema":"mwaa_schema","database":"mwaa_db","role":"accountadmin","warehouse":"dev_wh"}

For example:

{"host": "xxxxxx.snowflakecomputing.com", "account":"xxxxxx" ,"user":"xxxxx","password":"*****","schema":"mwaa_schema","database":"mwaa_db", "role":"accountadmin","warehouse":"dev_wh"}

The values for the database name, schema name, and role should be as mentioned earlier. The account, host, user, password, and warehouse can differ based on your setup.

Secret information

Choose Next.

  1. For Secret name, enter airflow/connections/snowflake_accountadmin.
  2. Leave all other values as default and choose Next.
  3. Choose Store.

Take note of the Region in which the secret was created under Secret ARN. We later define it as a variable in the Airflow UI.

Configure Snowflake access permissions and IAM role

Next, log in to your Snowflake account. Ensure the account you are using has account administrator access. Create a SQL worksheet. Under the worksheet, create a warehouse named dev_wh.

The following is an example SQL command:

CREATE OR REPLACE WAREHOUSE dev_wh 
 WITH WAREHOUSE_SIZE = 'xsmall' 
 AUTO_SUSPEND = 60 
 INITIALLY_SUSPENDED = true
 AUTO_RESUME = true
 MIN_CLUSTER_COUNT = 1
 MAX_CLUSTER_COUNT = 5;

For Snowflake to read data from and write data to an S3 bucket referenced in an external (S3 bucket) stage, a storage integration is required. Follow the steps defined in Option 1: Configuring a Snowflake Storage Integration to Access Amazon S3(only perform Steps 1 and 2, as described in this section).

Configure access permissions for the S3 bucket

While creating the IAM policy, a sample policy document code is needed (see the following code), which provides Snowflake with the required permissions to load or unload data using a single bucket and folder path. The bucket name used in this post is citibike-tripdata-destination-ACCOUNT_ID. You should modify it to reflect your bucket name.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject",
        "s3:GetObjectVersion",
        "s3:DeleteObject",
        "s3:DeleteObjectVersion"
      ],
      "Resource": "arn:aws:s3::: citibike-tripdata-destination-ACCOUNT_ID/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:ListBucket",
        "s3:GetBucketLocation"
      ],
      "Resource": "arn:aws:s3:::citibike-tripdata-destination-ACCOUNT_ID"
    }
  ]
}

Create the IAM role

Next, you create the IAM role to grant privileges on the S3 bucket containing your data files. After creation, record the Role ARN value located on the role summary page.

Snowflake IAM role

Configure variables

Lastly, configure the variables that will be accessed by the DAGs in Airflow. Log in to the Airflow UI and on the Admin menu, choose Variables and the plus sign.

Airflow variables

Add four variables with the following key/value pairs:

  • Key aws_role_arn with value <<snowflake_aws_role_arn>> (the ARN for role mysnowflakerole noted earlier)
  • Key destination_bucket with value <<bucket_name>> (for this post, the bucket used in citibike-tripdata-destination-ACCOUNT_ID)
  • Key target_sns_arn with value <<sns_Arn>> (the SNS topic in your account)
  • Key sec_key_region with value <<region_of_secret_deployment>> (the Region where the secret in Secrets Manager was created)

The following screenshot illustrates where to find the SNS topic ARN.

SNS topic ARN

The Airflow UI will now have the variables defined, which will be referred to by the DAGs.

Airflow variables list

Congratulations, you have completed all the configuration steps.

Run the DAG

Let’s look at how to run the DAGs. To recap:

  • DAG1 (create_snowflake_connection_blog.py) – Creates the Snowflake connection in Apache Airflow. This connection will be used to authenticate with Snowflake. The Snowflake connection string is stored in Secrets Manager, which is referenced in the DAG file.
  • DAG2 (create-snowflake_initial-setup_blog.py) – Creates the database, schema, storage integration, and stage in Snowflake.
  • DAG3 (run_mwaa_datapipeline_blog.py) – Runs the data pipeline, which will unzip files from the source public S3 bucket and copy them to the destination S3 bucket. The next task will create a table in Snowflake to store the data. Then the data from the destination S3 bucket will be copied into the table using a Snowflake stage. After the data is successfully copied, a view will be created in Snowflake, on top of which the SQL queries will be run.

To run the DAGs, complete the following steps:

  1. Upload the DAGs to the S3 bucket airflow-blog-bucket-ACCOUNT_ID/dags.
  2. Upload the SQL query files to the S3 bucket airflow-blog-bucket-ACCOUNT_ID/dags/mwaa_snowflake_queries.
  3. Log in to the Apache Airflow UI.
  4. Locate DAG1 (create_snowflake_connection_blog), un-pause it, and choose the play icon to run it.

You can view the run state of the DAG using the Grid or Graph view in the Airflow UI.

Dag1 run

After DAG1 runs, the Snowflake connection snowflake_conn_accountadmin is created on the Admin, Connections menu.

  1. Locate and run DAG2 (create-snowflake_initial-setup_blog).

Dag2 run

After DAG2 runs, the following objects are created in Snowflake:

  • The database mwaa_db
  • The schema mwaa_schema
  • The storage integration mwaa_citibike_storage_int
  • The stage mwaa_citibike_stg

Before running the final DAG, the trust relationship for the IAM user needs to be updated.

  1. Log in to your Snowflake account using your admin account credentials.
  2. Open your SQL worksheet created earlier and run the following command:
DESC INTEGRATION mwaa_citibike_storage_int;

mwaa_citibike_storage_int is the name of the integration created by the DAG2 in the previous step.

From the output, record the property value of the following two properties:

  • STORAGE_AWS_IAM_USER_ARN – The IAM user created for your Snowflake account.
  • STORAGE_AWS_EXTERNAL_ID – The external ID that is needed to establish a trust relationship.

Now we grant the Snowflake IAM user permissions to access bucket objects.

  1. On the IAM console, choose Roles in the navigation pane.
  2. Choose the role mysnowflakerole.
  3. On the Trust relationships tab, choose Edit trust relationship.
  4. Modify the policy document with the DESC STORAGE INTEGRATION output values you recorded. For example:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::5xxxxxxxx:user/mgm4-s- ssca0079"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "AWSPARTNER_SFCRole=4_vsarJrupIjjJh77J9Nxxxx/j98="
        }
      }
    }
  ]
}

The AWS role ARN and ExternalId will be different for your environment based on the output of the DESC STORAGE INTEGRATION query

Trust relationship

  1. Locate and run the final DAG (run_mwaa_datapipeline_blog).

At the end of the DAG run, the data is ready for querying. In this example, the query (finding the top start and destination stations) is run as part of the DAG and the output can be viewed from the Airflow XCOMs UI.

Xcoms

In the DAG run, the output is also published to Amazon SNS and based on the subscription, an email notification is sent out with the query output.

Email

Another method to visualize the results is directly from the Snowflake console using the Snowflake worksheet. The following is an example query:

SELECT START_STATION_NAME,
COUNT(START_STATION_NAME) C FROM MWAA_DB.MWAA_SCHEMA.CITIBIKE_VW 
GROUP BY 
START_STATION_NAME ORDER BY C DESC LIMIT 10;

Snowflake visual

There are different ways to visualize the output based on your use case.

As we observed, DAG1 and DAG2 need to be run only one time to set up the Amazon MWAA connection and Snowflake objects. DAG3 can be scheduled to run every week or month. With this solution, the user examining the data doesn’t have to log in to either Amazon MWAA or Snowflake. You can have an automated workflow triggered on a schedule that will ingest the latest data from the Citi Bike dataset and provide the top start and destination stations for the given dataset.

Clean up

To avoid incurring future charges, delete the AWS resources (IAM users and roles, Secrets Manager secrets, Amazon MWAA environment, SNS topics and subscription, S3 buckets) and Snowflake resources (database, stage, storage integration, view, tables) created as part of this post.

Conclusion

In this post, we demonstrated how to set up an Amazon MWAA connection for authenticating to Snowflake as well as to AWS using AWS user credentials. We used a DAG to automate creating the Snowflake objects such as database, tables, and stage using SQL queries. We also orchestrated the data pipeline using Amazon MWAA, which ran tasks related to data transformation as well as Snowflake queries. We used Secrets Manager to store Snowflake connection information and credentials and Amazon SNS to publish the data output for end consumption.

With this solution, you have an automated end-to-end orchestration of your data pipeline encompassing ingesting, transformation, analysis, and data consumption.

To learn more, refer to the following resources:


About the authors

Payal Singh is a Partner Solutions Architect at Amazon Web Services, focused on the Serverless platform. She is responsible for helping partner and customers modernize and migrate their applications to AWS.

James Sun is a Senior Partner Solutions Architect at Snowflake. He actively collaborates with strategic cloud partners like AWS, supporting product and service integrations, as well as the development of joint solutions. He has held senior technical positions at tech companies such as EMC, AWS, and MapR Technologies. With over 20 years of experience in storage and data analytics, he also holds a PhD from Stanford University.

Bosco Albuquerque is a Sr. Partner Solutions Architect at AWS and has over 20 years of experience working with database and analytics products from enterprise database vendors and cloud providers. He has helped technology companies design and implement data analytics solutions and products.

Manuj Arora is a Sr. Solutions Architect for Strategic Accounts in AWS. He focuses on Migration and Modernization capabilities and offerings in AWS. Manuj has worked as a Partner Success Solutions Architect in AWS over the last 3 years and worked with partners like Snowflake to build solution blueprints that are leveraged by the customers. Outside of work, he enjoys traveling, playing tennis and exploring new places with family and friends.

Spark on AWS Lambda: An Apache Spark runtime for AWS Lambda

Post Syndicated from John Cherian original https://aws.amazon.com/blogs/big-data/spark-on-aws-lambda-an-apache-spark-runtime-for-aws-lambda/

Spark on AWS Lambda (SoAL) is a framework that runs Apache Spark workloads on AWS Lambda. It’s designed for both batch and event-based workloads, handling data payload sizes from 10 KB to 400 MB. This framework is ideal for batch analytics workloads from Amazon Simple Storage Service (Amazon S3) and event-based streaming from Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon Kinesis. The framework seamlessly integrates data with platforms like Apache Iceberg, Apache Delta Lake, Apache HUDI, Amazon Redshift, and Snowflake, offering a low-cost and scalable data processing solution. SoAL provides a framework that enables you to run data-processing engines like Apache Spark and take advantage of the benefits of serverless architecture, like auto scaling and compute for analytics workloads.

This post highlights the SoAL architecture, provides infrastructure as code (IaC), offers step-by-step instructions for setting up the SoAL framework in your AWS account, and outlines SoAL architectural patterns for enterprises.

Solution overview

Apache Spark offers cluster mode and local mode deployments, with the former incurring latency due to the cluster initialization and warmup. Although Apache Spark’s cluster-based engines are commonly used for data processing, especially with ACID frameworks, they exhibit high resource overhead and slower performance for payloads under 50 MB compared to the more efficient Pandas framework for smaller datasets. When compared to Apache Spark cluster mode, local mode provides faster initialization and better performance for small analytics workloads. The Apache Spark local mode on the SoAL framework is optimized for small analytics workloads, and cluster mode is optimized for larger analytics workloads, making it a versatile framework for enterprises.

We provide an AWS Serverless Application Model (AWS SAM) template, available in the GitHub repo, to deploy the SoAL framework in an AWS account. The AWS SAM template builds the Docker image, pushes it to the Amazon Elastic Container Registry (Amazon ECR) repository, and then creates the Lambda function. The AWS SAM template expedites the setup and adoption of the SoAL framework for AWS customers.

SoAL architecture

The SoAL framework provides local mode and containerized Apache Spark running on Lambda. In the SoAL framework, Lambda runs in a Docker container with Apache Spark and AWS dependencies installed. On invocation, the SoAL framework’s Lambda handler fetches the PySpark script from an S3 folder and submits the Spark job on Lambda. The logs for the Spark jobs are recorded in Amazon CloudWatch.

For both streaming and batch tasks, the Lambda event is sent to the PySpark script as a named argument. Utilizing a container-based image cache along with the warm instance features of Lambda, it was found that the overall JVM warmup time reduced from approx. 70 seconds to under 30 seconds. It was observed that the framework performs well with batch payloads up to 400 MB and streaming data from Amazon MSK and Kinesis. The per-session costs for any given analytics workload depends on the number of requests, the run duration, and the memory configured for the Lambda functions.

The following diagram illustrates the SoAL architecture.

Enterprise architecture

The PySpark script is developed in standard Spark and is compatible with the SoAL framework, Amazon EMR, Amazon EMR Serverless, and AWS Glue. If needed, you can use the PySpark scripts in cluster mode on Amazon EMR, EMR Serverless, and AWS Glue. For analytics workloads with a size between a few KBs and 400 MB, you can use the SoAL framework on Lambda and in larger analytics workload scenarios over 400 MB, and run the same PySpark script on AWS cluster-based tools like Amazon EMR, EMR Serverless, and AWS Glue. The extensible script and architecture make SoAL a scalable framework for analytics workloads for enterprises. The following diagram illustrates this architecture.

Prerequisites

To implement this solution, you need an AWS Identity and Access Management (IAM) role with permission to AWS CloudFormation, Amazon ECR, Lambda, and AWS CodeBuild.

Set up the solution

To set up the solution in an AWS account, complete the following steps:

  1. Clone the GitHub repository to local storage and change the directory within the cloned folder to the CloudFormation folder:
    git clone https://github.com/aws-samples/spark-on-aws-lambda.git

  2. Run the AWS SAM template sam-imagebuilder.yaml using the following command with the stack name and framework of your choice. In this example, the framework is Apache HUDI:
    sam deploy --template-file sam-imagebuilder.yaml --stack-name spark-on-lambda-image-builder --capabilities CAPABILITY_IAM CAPABILITY_NAMED_IAM --resolve-s3 --parameter-overrides 'ParameterKey=FRAMEWORK,ParameterValue=HUDI'

The command deploys a CloudFormation stack called spark-on-lambda-image-builder. The command runs a CodeBuild project that builds and pushes the Docker image with the latest tag to Amazon ECR. The command has a parameter called ParameterValue for each open-source framework (Apache Delta, Apache HUDI, and Apache Iceberg).

  1. After the stack has been successfully deployed, copy the ECR repository URI (spark-on-lambda-image-builder) that is displayed in the output of the stack.
  2. Run the AWS SAM Lambda package with the required Region and ECR repository:
    sam deploy --template-file sam-template.yaml --stack-name spark-on-lambda-stack --capabilities CAPABILITY_IAM CAPABILITY_NAMED_IAM --resolve-s3 --image-repository <accountno>.dkr.ecr.us-east-1.amazonaws.com/sparkonlambda-spark-on-lambda-image-builder --parameter-overrides 'ParameterKey=ScriptBucket,ParameterValue=<Provide the s3 bcucket of the script> ParameterKey=SparkScript,ParameterValue=<provide s3 folder lcoation> ParameterKey=ImageUri,ParameterValue=<accountno>.dkr.ecr.us-east-1.amazonaws.com/sparkonlambda-spark-on-lambda-image-builder:latest'

This command creates the Lambda function with the container image from the ECR repository. An output file packaged-template.yaml is created in the local directory.

  1. Optionally, to publish the AWS SAM application to the AWS Serverless Application Repository, run the following command. This allows AWS SAM template sharing with the GUI interface using AWS Serverless Application Repository and other developers to use quick deployments in the future.
    sam publish --template packaged-template.yaml

After you run this command, a Lambda function is created using the SoAL framework runtime.

  1. To test it, use PySpark scripts from the spark-scripts folder. Place the sample script and accomodations.csv dataset in an S3 folder and provide the location via the Lambda environment variables SCRIPT_BUCKET and SCRIPT_LOCATION.

After Lambda is invoked, it uploads the PySpark script from the S3 folder to a container local storage and runs it on the SoAL framework container using SPARK-SUBMIT. The Lambda event is also passed to the PySpark script.

Clean up

Deploying an AWS SAM template incurs costs. Delete the Docker image from Amazon ECR, delete the Lambda function, and remove all the files or scripts from the S3 location. You can also use the following command to delete the stack:

sam delete --stack-name spark-on-lambda-stack

Conclusion

The SoAL framework enables you to run Apache Spark serverless tasks on AWS Lambda efficiently and cost-effectively. Beyond cost savings, it ensures swift processing times for small to medium files. As a holistic enterprise vision, SoAL seamlessly bridges the gap between big and small data processing, using the power of the Apache Spark runtime across both Lambda and other cluster-based AWS resources.

Follow the steps in this post to use the SoAL framework in your AWS account, and leave a comment if you have any questions.


About the authors

John Cherian is Senior Solutions Architect(SA) at Amazon Web Services helps customers with strategy and architecture for building solutions on AWS.

Emerson Antony is Senior Cloud Architect at Amazon Web Services helps customers with implementing AWS solutions.

Kiran Anand is Principal AWS Data Lab Architect at Amazon Web Services helps customers with Big data & Analytics architecture.

Unlock scalable analytics with AWS Glue and Google BigQuery

Post Syndicated from Kartikay Khator original https://aws.amazon.com/blogs/big-data/unlock-scalable-analytics-with-aws-glue-and-google-bigquery/

Data integration is the foundation of robust data analytics. It encompasses the discovery, preparation, and composition of data from diverse sources. In the modern data landscape, accessing, integrating, and transforming data from diverse sources is a vital process for data-driven decision-making. AWS Glue, a serverless data integration and extract, transform, and load (ETL) service, has revolutionized this process, making it more accessible and efficient. AWS Glue eliminates complexities and costs, allowing organizations to perform data integration tasks in minutes, boosting efficiency.

This blog post explores the newly announced managed connector for Google BigQuery and demonstrates how to build a modern ETL pipeline with AWS Glue Studio without writing code.

Overview of AWS Glue

AWS Glue is a serverless data integration service that makes it easier to discover, prepare, and combine data for analytics, machine learning (ML), and application development. AWS Glue provides all the capabilities needed for data integration, so you can start analyzing your data and putting it to use in minutes instead of months. AWS Glue provides both visual and code-based interfaces to make data integration easier. Users can more easily find and access data using the AWS Glue Data Catalog. Data engineers and ETL (extract, transform, and load) developers can visually create, run, and monitor ETL workflows in a few steps in AWS Glue Studio. Data analysts and data scientists can use AWS Glue DataBrew to visually enrich, clean, and normalize data without writing code.

Introducing Google BigQuery Spark connector

To meet the demands of diverse data integration use cases, AWS Glue now offers a native spark connector for Google BigQuery. Customers can now use AWS Glue 4.0 for Spark to read from and write to tables in Google BigQuery. Additionally, you can read an entire table or run a custom query and write your data using direct and indirect writing methods. You connect to BigQuery using service account credentials stored securely in AWS Secrets Manager.

Benefits of Google BigQuery Spark connector

  • Seamless integration: The native connector offers an intuitive and streamlined interface for data integration, reducing the learning curve.
  • Cost efficiency: Building and maintaining custom connectors can be expensive. The native connector provided by AWS Glue is a cost-effective alternative.
  • Efficiency: Data transformation tasks that previously took weeks or months can now be accomplished within minutes, optimizing efficiency.

Solution overview

In this example, you create two ETL jobs using AWS Glue with the native Google BigQuery connector.

  1. Query a BigQuery table and save the data into Amazon Simple Storage Service (Amazon S3) in Parquet format.
  2. Use the data extracted from the first job to transform and create an aggregated result to be stored in Google BigQuery.

solution architecture

Prerequisites

The dataset used in this solution is the NCEI/WDS Global Significant Earthquake Database, with a global listing of over 5,700 earthquakes from 2150 BC to the present. Copy this public data into your Google BigQuery project or use your existing dataset.

Configure BigQuery connections

To connect to Google BigQuery from AWS Glue, see Configuring BigQuery connections. You must create and store your Google Cloud Platform credentials in a Secrets Manager secret, then associate that secret with a Google BigQuery AWS Glue connection.

Set up Amazon S3

Every object in Amazon S3 is stored in a bucket. Before you can store data in Amazon S3, you must create an S3 bucket to store the results.

To create an S3 bucket:

  1. On the AWS Management Console for Amazon S3, choose Create bucket.
  2. Enter a globally unique Name for your bucket; for example, awsglue-demo.
  3. Choose Create bucket.

Create an IAM role for the AWS Glue ETL job

When you create the AWS Glue ETL job, you specify an AWS Identity and Access Management (IAM) role for the job to use. The role must grant access to all resources used by the job, including Amazon S3 (for any sources, targets, scripts, driver files, and temporary directories), and Secrets Manager.

For instructions, see Configure an IAM role for your ETL job.

Solution walkthrough

Create a visual ETL job in AWS Glue Studio to transfer data from Google BigQuery to Amazon S3

  1. Open the AWS Glue console.
  2. In AWS Glue, navigate to Visual ETL under the ETL jobs section and create a new ETL job using Visual with a blank canvas.
  3. Enter a Name for your AWS Glue job, for example, bq-s3-dataflow.
  4. Select Google BigQuery as the data source.
    1. Enter a name for your Google BigQuery source node, for example, noaa_significant_earthquakes.
    2. Select a Google BigQuery connection, for example, bq-connection.
    3. Enter a Parent project, for example, bigquery-public-datasources.
    4. Select Choose a single table for the BigQuery Source.
    5. Enter the table you want to migrate in the form [dataset].[table], for example, noaa_significant_earthquakes.earthquakes.
      big query data source for bq to amazon s3 dataflow
  5. Next, choose the data target as Amazon S3.
    1. Enter a Name for the target Amazon S3 node, for example, earthquakes.
    2. Select the output data Format as Parquet.
    3. Select the Compression Type as Snappy.
    4. For the S3 Target Location, enter the bucket created in the prerequisites, for example, s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/.
    5. You should replace <YourBucketName> with the name of your bucket.
      s3 target node for bq to amazon s3 dataflow
  6. Next go to the Job details. In the IAM Role, select the IAM role from the prerequisites, for example, AWSGlueRole.
    IAM role for bq to amazon s3 dataflow
  7. Choose Save.

Run and monitor the job

  1. After your ETL job is configured, you can run the job. AWS Glue will run the ETL process, extracting data from Google BigQuery and loading it into your specified S3 location.
  2. Monitor the job’s progress in the AWS Glue console. You can see logs and job run history to ensure everything is running smoothly.

run and monitor bq to amazon s3 dataflow

Data validation

  1. After the job has run successfully, validate the data in your S3 bucket to ensure it matches your expectations. You can see the results using Amazon S3 Select.

review results in amazon s3 from the bq to s3 dataflow run

Automate and schedule

  1. If needed, set up job scheduling to run the ETL process regularly. You can use AWS to automate your ETL jobs, ensuring your S3 bucket is always up to date with the latest data from Google BigQuery.

You’ve successfully configured an AWS Glue ETL job to transfer data from Google BigQuery to Amazon S3. Next, you create the ETL job to aggregate this data and transfer it to Google BigQuery.

Finding earthquake hotspots with AWS Glue Studio Visual ETL.

  1. Open AWS Glue console.
  2. In AWS Glue navigate to Visual ETL under the ETL jobs section and create a new ETL job using Visual with a blank canvas.
  3. Provide a name for your AWS Glue job, for example, s3-bq-dataflow.
  4. Choose Amazon S3 as the data source.
    1. Enter a Name for the source Amazon S3 node, for example, earthquakes.
    2. Select S3 location as the S3 source type.
    3. Enter the S3 bucket created in the prerequisites as the S3 URL, for example, s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/.
    4. You should replace <YourBucketName> with the name of your bucket.
    5. Select the Data format as Parquet.
    6. Select Infer schema.
      amazon s3 source node for s3 to bq dataflow
  5. Next, choose Select Fields transformation.
    1. Select earthquakes as Node parents.
    2. Select fields: id, eq_primary, and country.
      select field node for amazon s3 to bq dataflow
  6. Next, choose Aggregate transformation.
    1. Enter a Name, for example Aggregate.
    2. Choose Select Fields as Node parents.
    3. Choose eq_primary and country as the group by columns.
    4. Add id as the aggregate column and count as the aggregation function.
      aggregate node for amazon s3 to bq dataflow
  7. Next, choose RenameField transformation.
    1. Enter a name for the source Amazon S3 node, for example, Rename eq_primary.
    2. Choose Aggregate as Node parents.
    3. Choose eq_primary as the Current field name and enter earthquake_magnitude as the New field name.
      rename eq_primary field for amazon s3 to bq dataflow
  8. Next, choose RenameField transformation
    1. Enter a name for the source Amazon S3 node, for example, Rename count(id).
    2. Choose Rename eq_primary as Node parents.
    3. Choose count(id) as the Current field name and enter number_of_earthquakes as the New field name.
      rename cound(id) field for amazon s3 to bq dataflow
  9. Next, choose the data target as Google BigQuery.
    1. Provide a name for your Google BigQuery source node, for example, most_powerful_earthquakes.
    2. Select a Google BigQuery connection, for example, bq-connection.
    3. Select Parent project, for example, bigquery-public-datasources.
    4. Enter the name of the Table you want to create in the form [dataset].[table], for example, noaa_significant_earthquakes.most_powerful_earthquakes.
    5. Choose Direct as the Write method.
      bq destination for amazon s3 to bq dataflow
  10. Next go to the Job details tab and in the IAM Role, select the IAM role from the prerequisites, for example, AWSGlueRole.
    IAM role for amazon s3 to bq dataflow
  11. Choose Save.

Run and monitor the job

  1. After your ETL job is configured, you can run the job. AWS Glue runs the ETL process, extracting data from Google BigQuery and loading it into your specified S3 location.
  2. Monitor the job’s progress in the AWS Glue console. You can see logs and job run history to ensure everything is running smoothly.

monitor and run for amazon s3 to bq dataflow

Data validation

  1. After the job has run successfully, validate the data in your Google BigQuery dataset. This ETL job returns a list of countries where the most powerful earthquakes have occurred. It provides these by counting the number of earthquakes for a given magnitude by country.

aggregated results for amazon s3 to bq dataflow

Automate and schedule

  1. You can set up job scheduling to run the ETL process regularly. AWS Glue allows you to automate your ETL jobs, ensuring your S3 bucket is always up to date with the latest data from Google BigQuery.

That’s it! You’ve successfully set up an AWS Glue ETL job to transfer data from Amazon S3 to Google BigQuery. You can use this integration to automate the process of data extraction, transformation, and loading between these two platforms, making your data readily available for analysis and other applications.

Clean up

To avoid incurring charges, clean up the resources used in this blog post from your AWS account by completing the following steps:

  1. On the AWS Glue console, choose Visual ETL in the navigation pane.
  2. From the list of jobs, select the job bq-s3-data-flow and delete it.
  3. From the list of jobs, select the job s3-bq-data-flow and delete it.
  4. On the AWS Glue console, choose Connections in the navigation pane under Data Catalog.
  5. Choose the BiqQuery connection you created and delete it.
  6. On the Secrets Manager console, choose the secret you created and delete it.
  7. On the IAM console, choose Roles in the navigation pane, then select the role you created for the AWS Glue ETL job and delete it.
  8. On the Amazon S3 console, search for the S3 bucket you created, choose Empty to delete the objects, then delete the bucket.
  9. Clean up resources in your Google account by deleting the project that contains the Google BigQuery resources. Follow the documentation to clean up the Google resources.

Conclusion

The integration of AWS Glue with Google BigQuery simplifies the analytics pipeline, reduces time-to-insight, and facilitates data-driven decision-making. It empowers organizations to streamline data integration and analytics. The serverless nature of AWS Glue means no infrastructure management, and you pay only for the resources consumed while your jobs are running. As organizations increasingly rely on data for decision-making, this native spark connector provides an efficient, cost-effective, and agile solution to swiftly meet data analytics needs.

If you’re interested to see how to read from and write to tables in Google BigQuery in AWS Glue, take a look at step-by-step video tutorial. In this tutorial, we walk through the entire process, from setting up the connection to running the data transfer flow. For more information on AWS Glue, visit AWS Glue.

Appendix

If you are looking to implement this example, using code instead of the AWS Glue console, use the following code snippets.

Reading data from Google BigQuery and writing data into Amazon S3

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# STEP-1 Read the data from Big Query Table 
noaa_significant_earthquakes_node1697123333266 = (
    glueContext.create_dynamic_frame.from_options(
        connection_type="bigquery",
        connection_options={
            "connectionName": "bq-connection",
            "parentProject": "bigquery-public-datasources",
            "sourceType": "table",
            "table": "noaa_significant_earthquakes.earthquakes",
        },
        transformation_ctx="noaa_significant_earthquakes_node1697123333266",
    )
)
# STEP-2 Write the data read from Big Query Table into S3
# You should replace <YourBucketName> with the name of your bucket.
earthquakes_node1697157772747 = glueContext.write_dynamic_frame.from_options(
    frame=noaa_significant_earthquakes_node1697123333266,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/",
        "partitionKeys": [],
    },
    format_options={"compression": "snappy"},
    transformation_ctx="earthquakes_node1697157772747",
)

job.commit()

Reading and aggregating data from Amazon S3 and writing into Google BigQuery

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from awsglue import DynamicFrame
from pyspark.sql import functions as SqlFuncs

def sparkAggregate(
    glueContext, parentFrame, groups, aggs, transformation_ctx
) -> DynamicFrame:
    aggsFuncs = []
    for column, func in aggs:
        aggsFuncs.append(getattr(SqlFuncs, func)(column))
    result = (
        parentFrame.toDF().groupBy(*groups).agg(*aggsFuncs)
        if len(groups) > 0
        else parentFrame.toDF().agg(*aggsFuncs)
    )
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# STEP-1 Read the data from Amazon S3 bucket
# You should replace <YourBucketName> with the name of your bucket.
earthquakes_node1697218776818 = glueContext.create_dynamic_frame.from_options(
    format_options={},
    connection_type="s3",
    format="parquet",
    connection_options={
        "paths": [
            "s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/"
        ],
        "recurse": True,
    },
    transformation_ctx="earthquakes_node1697218776818",
)

# STEP-2 Select fields
SelectFields_node1697218800361 = SelectFields.apply(
    frame=earthquakes_node1697218776818,
    paths=["id", "eq_primary", "country"],
    transformation_ctx="SelectFields_node1697218800361",
)

# STEP-3 Aggregate data
Aggregate_node1697218823404 = sparkAggregate(
    glueContext,
    parentFrame=SelectFields_node1697218800361,
    groups=["eq_primary", "country"],
    aggs=[["id", "count"]],
    transformation_ctx="Aggregate_node1697218823404",
)

Renameeq_primary_node1697219483114 = RenameField.apply(
    frame=Aggregate_node1697218823404,
    old_name="eq_primary",
    new_name="earthquake_magnitude",
    transformation_ctx="Renameeq_primary_node1697219483114",
)

Renamecountid_node1697220511786 = RenameField.apply(
    frame=Renameeq_primary_node1697219483114,
    old_name="`count(id)`",
    new_name="number_of_earthquakes",
    transformation_ctx="Renamecountid_node1697220511786",
)

# STEP-1 Write the aggregated data in Google BigQuery
most_powerful_earthquakes_node1697220563923 = (
    glueContext.write_dynamic_frame.from_options(
        frame=Renamecountid_node1697220511786,
        connection_type="bigquery",
        connection_options={
            "connectionName": "bq-connection",
            "parentProject": "bigquery-public-datasources",
            "writeMethod": "direct",
            "table": "noaa_significant_earthquakes.most_powerful_earthquakes",
        },
        transformation_ctx="most_powerful_earthquakes_node1697220563923",
    )
)

job.commit()


About the authors

Kartikay Khator is a Solutions Architect in Global Life Sciences at Amazon Web Services (AWS). He is passionate about building innovative and scalable solutions to meet the needs of customers, focusing on AWS Analytics services. Beyond the tech world, he is an avid runner and enjoys hiking.

Kamen SharlandjievKamen Sharlandjiev is a Sr. Big Data and ETL Solutions Architect and Amazon AppFlow expert. He’s on a mission to make life easier for customers who are facing complex data integration challenges. His secret weapon? Fully managed, low-code AWS services that can get the job done with minimal effort and no coding.

Anshul SharmaAnshul Sharma is a Software Development Engineer in AWS Glue Team. He is driving the connectivity charter which provide Glue customer native way of connecting any Data source (Data-warehouse, Data-lakes, NoSQL etc) to Glue ETL Jobs. Beyond the tech world, he is a cricket and soccer lover.

Amazon Redshift: Lower price, higher performance

Post Syndicated from Stefan Gromoll original https://aws.amazon.com/blogs/big-data/amazon-redshift-lower-price-higher-performance/

Like virtually all customers, you want to spend as little as possible while getting the best possible performance. This means you need to pay attention to price-performance. With Amazon Redshift, you can have your cake and eat it too! Amazon Redshift delivers up to 4.9 times lower cost per user and up to 7.9 times better price-performance than other cloud data warehouses on real-world workloads using advanced techniques like concurrency scaling to support hundreds of concurrent users, enhanced string encoding for faster query performance, and Amazon Redshift Serverless performance enhancements. Read on to understand why price-performance matters and how Amazon Redshift price-performance is a measure of how much it costs to get a particular level of workload performance, namely performance ROI (return on investment).

Because both price and performance enter into the price-performance calculation, there are two ways to think about price-performance. The first way is to hold price constant: if you have $1 to spend, how much performance do you get from your data warehouse? A database with better price-performance will deliver better performance for each $1 spent. Therefore, when holding price constant when comparing two data warehouses that cost the same, the database with better price-performance will run your queries faster. The second way to look at price-performance is to hold performance constant: if you need your workload to finish in 10 minutes, what will it cost? A database with better price-performance will run your workload in 10 minutes at a lower cost. Therefore, when holding performance constant when comparing two data warehouses that are sized to deliver the same performance, the database with better price-performance will cost less and save you money.

Finally, another important aspect of price-performance is predictability. Knowing how much your data warehouse is going to cost as the number of data warehouse users grows is crucial for planning. It should not only deliver the best price-performance today, but also scale predictably and deliver the best price-performance as more users and workloads are added. An ideal data warehouse should have linear scale—scaling your data warehouse to deliver twice the query throughput should ideally cost twice as much (or less).

In this post, we share performance results to illustrate how Amazon Redshift delivers significantly better price-performance compared to leading alternative cloud data warehouses. This means that if you spend the same amount on Amazon Redshift as you would on one of these other data warehouses, you will get better performance with Amazon Redshift. Alternatively, if you size your Redshift cluster to deliver the same performance, you will see lower costs compared to these alternatives.

Price-performance for real-world workloads

You can use Amazon Redshift to power a very wide diversity of workloads, from batch-processing of complex extract, transform, and load (ETL)-based reports, and real-time streaming analytics to low-latency business intelligence (BI) dashboards that need to serve hundreds or even thousands of users at the same time with subsecond response times, and everything in between. One of the ways we continually improve price-performance for our customers is to constantly review the software and hardware performance telemetry from the Redshift fleet, looking for opportunities and customer use cases where we can further improve Amazon Redshift performance.

Some recent examples of performance optimizations driven by fleet telemetry include:

  • String query optimizations – By analyzing how Amazon Redshift processed different data types in the Redshift fleet, we found that optimizing string-heavy queries would bring significant benefit to our customers’ workloads. (We discuss this in more detail later in this post.)
  • Automated materialized views – We found that Amazon Redshift customers often run many queries that have common subquery patterns. For example, several different queries may join the same three tables using the same join condition. Amazon Redshift is now able to automatically create and maintain materialized views and then transparently rewrite queries to use the materialized views using the machine-learned automated materialized view autonomics feature in Amazon Redshift. When enabled, automated materialized views can transparently increase query performance for repetitive queries without any user intervention. (Note that automated materialized views were not used in any of the benchmark results discussed in this post).
  • High-concurrency workloads – A growing use case we see is using Amazon Redshift to serve dashboard-like workloads. These workloads are characterized by desired query response times of single-digit seconds or less, with tens or hundreds of concurrent users running queries simultaneously with a spiky and often unpredictable usage pattern. The prototypical example of this is an Amazon Redshift-backed BI dashboard that has a spike in traffic Monday mornings when a large number of users start their week.

High-concurrency workloads in particular have very broad applicability: most data warehouse workloads operate at concurrency, and it’s not uncommon for hundreds or even thousands of users to run queries on Amazon Redshift at the same time. Amazon Redshift was designed to keep query response times predictable and fast. Redshift Serverless does this automatically for you by adding and removing compute as needed to keep query response times fast and predictable. This means a Redshift Serverless-backed dashboard that loads quickly when it’s being accessed by one or two users will continue to load quickly even when many users are loading it at the same time.

To simulate this type of workload, we used a benchmark derived from TPC-DS with a 100 GB data set. TPC-DS is an industry-standard benchmark that includes a variety of typical data warehouse queries. At this relatively small scale of 100 GB, queries in this benchmark run on Redshift Serverless in an average of a few seconds, which is representative of what users loading an interactive BI dashboard would expect. We ran between 1–200 concurrent tests of this benchmark, simulating between 1–200 users trying to load a dashboard at the same time. We also repeated the test against several popular alternative cloud data warehouses that also support scaling out automatically (if you’re familiar with the post Amazon Redshift continues its price-performance leadership, we didn’t include Competitor A because it doesn’t support automatically scaling up). We measured average query response time, meaning how long a user would wait for their queries to finish (or their dashboard to load). The results are shown in the following chart.

Competitor B scales well until around 64 concurrent queries, at which point it is unable to provide additional compute and queries begin to queue, leading to increased query response times. Although Competitor C is able to scale automatically, it scales to lower query throughput than both Amazon Redshift and Competitor B and is not able to keep query runtimes low. In addition, it doesn’t support queueing queries when it runs out of compute, which prevents it from scaling beyond around 128 concurrent users. Submitting additional queries beyond this are rejected by the system.

Here, Redshift Serverless is able to keep the query response time relatively consistent at around 5 seconds even when hundreds of users are running queries at the same time. The average query response times for Competitors B and C increase steadily as load on the warehouses increases, which results in users having to wait longer (up to 16 seconds) for their queries to return when the data warehouse is busy. This means that if a user is trying to refresh a dashboard (which may even submit several concurrent queries when reloaded), Amazon Redshift would be able to keep dashboard load times far more consistent even if the dashboard is being loaded by tens or hundreds of other users at the same time.

Because Amazon Redshift is able to deliver very high query throughput for short queries (as we wrote about in Amazon Redshift continues its price-performance leadership), it’s also able to handle these higher concurrencies when scaling out more efficiently and therefore at a significantly lower cost. To quantify this, we look at the price-performance using published on-demand pricing for each of the warehouses in the preceding test, shown in the following chart. It’s worth noting that using Reserved Instances (RIs), especially 3-year RIs purchased with the all upfront payment option, has the lowest cost to run Amazon Redshift on Provisioned clusters, resulting in the best relative price-performance compared to on-demand or other RI options.

So not only is Amazon Redshift able to deliver better performance at higher concurrencies, it’s able to do so at significantly lower cost. Each data point in the price-performance chart is equivalent to the cost to run the benchmark at the specified concurrency. Because the price-performance is linear, we can divide the cost to run the benchmark at any concurrency by the concurrency (number of Concurrent Users in this chart) to tell us how much adding each new user costs for this particular benchmark.

The preceding results are straightforward to replicate. All queries used in the benchmark are available in our GitHub repository and performance is measured by launching a data warehouse, enabling Concurrency Scaling on Amazon Redshift (or the corresponding auto scaling feature on other warehouses), loading the data out of the box (no manual tuning or database-specific setup), and then running a concurrent stream of queries at concurrencies from 1–200 in steps of 32 on each data warehouse. The same GitHub repo references pregenerated (and unmodified) TPC-DS data in Amazon Simple Storage Service (Amazon S3) at various scales using the official TPC-DS data generation kit.

Optimizing string-heavy workloads

As mentioned earlier, the Amazon Redshift team is continuously looking for new opportunities to deliver even better price-performance for our customers. One improvement we recently launched that significantly improved performance is an optimization that accelerates the performance of queries over string data. For example, you might want to find the total revenue generated from retail stores located in New York City with a query like SELECT sum(price) FROM sales WHERE city = ‘New York’. This query is applying a predicate over string data (city = ‘New York’). As you can imagine, string data processing is ubiquitous in data warehouse applications.

To quantify how often customers’ workloads access strings, we conducted a detailed analysis of string data type usage using fleet telemetry of tens of thousands of customer clusters managed by Amazon Redshift. Our analysis indicates that in 90% of the clusters, string columns constitute at least 30% of all the columns, and in 50% of the clusters, string columns constitute at least 50% of all the columns. Moreover, a majority of all queries run on the Amazon Redshift cloud data warehouse platform access at least one string column. Another important factor is that string data is very often low cardinality, meaning the columns contain a relatively small set of unique values. For example, although an orders table representing sales data may contain billions of rows, an order_status column within that table might contain only a few unique values across those billions of rows, such as pending, in process, and completed.

As of this writing, most string columns in Amazon Redshift are compressed with LZO or ZSTD algorithms. These are good general-purpose compression algorithms, but they aren’t designed to take advantage of low-cardinality string data. In particular, they require that data be decompressed before being operated on, and are less efficient in their use of hardware memory bandwidth. For low-cardinality data, there is another type of encoding that can be more optimal: BYTEDICT. This encoding uses a dictionary-encoding scheme that allows the database engine to operate directly over compressed data without the need to decompress it first.

To further improve price-performance for string-heavy workloads, Amazon Redshift is now introducing additional performance enhancements that speed up scans and predicate evaluations, over low-cardinality string columns that are encoded as BYTEDICT, between 5–63 times faster (see results in the next section) compared to alternative compression encodings such as LZO or ZSTD. Amazon Redshift achieves this performance improvement by vectorizing scans over lightweight, CPU-efficient, BYTEDICT-encoded, low-cardinality string columns. These string-processing optimizations make effective use of memory bandwidth afforded by modern hardware, enabling real-time analytics over string data. These newly introduced performance capabilities are optimal for low-cardinality string columns (up to a few hundred unique string values).

You can automatically benefit from this new high performance string enhancement by enabling automatic table optimization in your Amazon Redshift data warehouse. If you don’t have automatic table optimization enabled on your tables, you can receive recommendations from the Amazon Redshift Advisor in the Amazon Redshift console on a string column’s suitability for BYTEDICT encoding. You can also define new tables that have low-cardinality string columns with BYTEDICT encoding. String enhancements in Amazon Redshift are now available in all AWS Regions where Amazon Redshift is available.

Performance results

To measure the performance impact of our string enhancements, we generated a 10TB (Tera Byte) dataset that consisted of low-cardinality string data. We generated three versions of the data using short, medium, and long strings, corresponding to the 25th, 50th, and 75th percentile of string lengths from Amazon Redshift fleet telemetry. We loaded this data into Amazon Redshift twice, encoding it in one case using LZO compression and in another using BYTEDICT compression. Finally, we measured the performance of scan-heavy queries that return many rows (90% of the table), a medium number of rows (50% of the table), and a few rows (1% of the table) over these low-cardinality string datasets. The performance results are summarized in the following chart.

Queries with predicates that match a high percentage of rows saw improvements of 5–30 times with the new vectorized BYTEDICT encoding compared to LZO, whereas queries with predicates that match a low percentage of rows saw improvements of 10–63 times in this internal benchmark.

Redshift Serverless price-performance

In addition to the high-concurrency performance results presented in this post, we also used the TPC-DS-derived Cloud Data Warehouse benchmark to compare the price-performance of Redshift Serverless to other data warehouses using a larger 3TB dataset. We chose data warehouses that were priced similarly, in this case within 10% of $32 per hour using publicly available on-demand pricing. These results show that, like Amazon Redshift RA3 instances, Redshift Serverless delivers better price-performance compared to other leading cloud data warehouses. As always, these results can be replicated by using our SQL scripts in our GitHub repository.

We encourage you to try Amazon Redshift using your own proof of concept workloads as the best way to see how Amazon Redshift can meet your data analytics needs.

Find the best price-performance for your workloads

The benchmarks used in this post are derived from the industry-standard TPC-DS benchmark, and have the following characteristics:

  • The schema and data are used unmodified from TPC-DS.
  • The queries are generated using the official TPC-DS kit with query parameters generated using the default random seed of the TPC-DS kit. TPC-approved query variants are used for a warehouse if the warehouse doesn’t support the SQL dialect of the default TPC-DS query.
  • The test includes the 99 TPC-DS SELECT queries. It doesn’t include maintenance and throughput steps.
  • For the single 3TB concurrency test, three power runs were run, and the best run is taken for each data warehouse.
  • Price-performance for the TPC-DS queries is calculated as cost per hour (USD) times the benchmark runtime in hours, which is equivalent to the cost to run the benchmark. The latest published on-demand pricing is used for all data warehouses and not Reserved Instance pricing as noted earlier.

We call this the Cloud Data Warehouse benchmark, and you can easily reproduce the preceding benchmark results using the scripts, queries, and data available in our GitHub repository. It is derived from the TPC-DS benchmarks as described in this post, and as such is not comparable to published TPC-DS results, because the results of our tests don’t comply with the official specification.

Conclusion

Amazon Redshift is committed to delivering the industry’s best price-performance for the widest variety of workloads. Redshift Serverless scales linearly with the best (lowest) price-performance, supporting hundreds of concurrent users while maintaining consistent query response times. Based on test results discussed in this post, Amazon Redshift has up to 2.6 times better price-performance at the same level of concurrency compared to the nearest competitor (Competitor B). As mentioned earlier, using Reserved Instances with the 3-year all upfront option gives you the lowest cost to run Amazon Redshift, resulting in even better relative price-performance compared to on-demand instance pricing that we used in this post. Our approach to continuous performance improvement involves a unique combination of customer obsession to understand customer use cases and their associated scalability bottlenecks coupled with continuous fleet data analysis to identify opportunities to make significant performance optimizations.

Each workload has unique characteristics, so if you’re just getting started, a proof of concept is the best way to understand how Amazon Redshift can lower your costs while delivering better performance. When running your own proof of concept, it’s important to focus on the right metrics—query throughput (number of queries per hour), response time, and price-performance. You can make a data-driven decision by running a proof of concept on your own or with assistance from AWS or a system integration and consulting partner.

To stay up to date with the latest developments in Amazon Redshift, follow the What’s New in Amazon Redshift feed.


About the authors

Stefan Gromoll is a Senior Performance Engineer with Amazon Redshift team where he is responsible for measuring and improving Redshift performance. In his spare time, he enjoys cooking, playing with his three boys, and chopping firewood.

Ravi Animi is a Senior Product Management leader in the Amazon Redshift team and manages several functional areas of the Amazon Redshift cloud data warehouse service including performance, spatial analytics, streaming ingestion and migration strategies. He has experience with relational databases, multi-dimensional databases, IoT technologies, storage and compute infrastructure services and more recently as a startup founder using AI/deep learning, computer vision, and robotics.

Aamer Shah is a Senior Engineer in the Amazon Redshift Service team.

Sanket Hase is a Software Development Manager in the Amazon Redshift Service team.

Orestis Polychroniou is a Principal Engineer in the Amazon Redshift Service team.

An automated approach to perform an in-place engine upgrade in Amazon OpenSearch Service

Post Syndicated from Prashant Agrawal original https://aws.amazon.com/blogs/big-data/an-automated-approach-to-perform-an-in-place-engine-upgrade-in-amazon-opensearch-service/

Software upgrades bring new features and better performance, and keep you current with the software provider. However, upgrades for software services can be difficult to complete successfully, especially when you can’t tolerate downtime and when the new version’s APIs introduce breaking changes and deprecation that you must remediate. This post shows you how to upgrade from Elasticsearch engine to OpenSearch engine on Amazon OpenSearch Service without needing an intermediate upgrade to Elasticsearch 7.10.

OpenSearch Service supports OpenSearch as an engine, with versions in the 1.x through 2.x series. The service also supports legacy versions of Elasticsearch, versions 1.x through 7.10. Although OpenSearch brings many improvements over earlier engines, it can feel daunting to consider not only upgrading versions, but also changing engines in the process. The good news is that OpenSearch 1.0 is wire compatible with Elasticsearch 7.10, making engine changes straightforward. If you’re running a version of Elasticsearch in the 6.x or early 7.x series on OpenSearch Service, you might think you need to upgrade to Elasticsearch 7.10, and then upgrade to OpenSearch 1.3. However, you can easily upgrade your existing Elasticsearch engine running 6.8, 7.1, 7.2, 7.4, 7.9, and 7.10 in OpenSearch Service to the OpenSearch 1.3 engine.

OpenSearch Service runs a variety of checks before running an actual upgrade:

  • Validation before starting an upgrade
  • Preparing the setup configuration for the desired version
  • Provisioning new nodes with the same hardware configuration
  • Moving shards from old nodes to newly provisioned nodes
  • Removing older nodes and old node references from OpenSearch endpoints

During an upgrade, AWS takes care of the undifferentiated heavy lifting of provisioning, deploying, and moving the data to new domain. You are responsible to make sure there are no breaking changes that affect the data migration and movement to the newer version of the OpenSearch domain. In this post, we discuss the things you must modify and verify before and after running an upgrade from 6.8, 7.1, 7.2, 7.4, 7.9, and 7.10 version of Elasticsearch to 1.3 OpenSearch Service.

Pre-upgrade breaking changes

The following are pre-upgrade breaking changes:

  • Dependency check for language clients and libraries – If you’re using the open-source high-level language clients from Elastic, for example the Java, go, or Python client libraries, AWS recommends moving to the open-source, OpenSearch versions of these clients. (If you don’t use a high-level language client, you can skip this step.) The following are a few steps to perform a dependency check:
    • Determine the client library – Choose an appropriate client library compatible with your programing language. Refer to OpenSearch language clients for a list of all supported client libraries.
    • Add dependencies and resolve conflicts – Update your project’s dependency management system with the necessary dependencies specified by the client library. If your project already has dependencies that conflict with the OpenSearch client library dependencies, you may encounter dependency conflicts. In such cases, you need to resolve the conflicts manually.
    • Test and verify the client – Test the OpenSearch client functionality by establishing a connection, performing some basic operations (like indexing and searching), and verifying the results.
  • Removal of mapping types – Multiple types within an index were deprecated in Elasticsearch version 6.x, and completely removed in version 7.0 or later. OpenSearch indexes can only contain one mapping type. From OpenSearch version 2.x onward, the mapping _type must be _doc. You must check and fix the mapping before upgrading to OpenSearch 1.3.

Complete the following steps to identify and fix mapping issues:

  1. Navigate to dev tools and use the following GET <index> mapping API to fetch the mapping information for all the indexes:
GET /index-name/_mapping

The mapping response will contain a JSON structure that represents the mapping for your index.

  1. Look for the top-level keys in the response JSON; each key represents a custom type within the index.

The _doc type is used for the default type in Elasticsearch 7.x and OpenSearch Service 1.x, but you may see additional types that you defined in earlier versions of Elasticsearch. The following is an example response for an index with two custom types, type1 and type2.

Note that indexes created in 5.x will continue to function in 6.x as they did in 5.x, but indexes created in 6.x only allow a single type per index.

{
  "myindex": {
    "mappings": {
      "type1": {
        "properties": {
          "field1_type1": {
            "type": "text"
          },
          "field2_type1": {
            "type": "integer"
          }
        }
      },
      "type2": {
        "properties": {
          "field1_type2": {
            "type": "keyword"
          },
          "field2_type2": {
            "type": "date"
          }
        }
      }
    }
  }
}

To fix the multiple mapping types in your existing domain, you need to reindex the data, where you can create one index for each mapping. This is a crucial step in the migration process because OpenSearch doesn’t support multiple types within a single index. In the next steps, we convert an index that has multiple mapping types into two separate indexes, each using the _doc type.

  1. You can unify the mapping by using your existing index name as a root and adding the type as a suffix. For example, the following code creates two indexes with myindex as the root name and type1 and type2 as the suffix:
    # Create an index for "type1"
    PUT /myindex_type1
    
    # Create an index for "type2"
    PUT /myindex_type2

  2. Use the _reindex API to reindex the data from the original index into the two new indexes. Alternately, you can reload the data from its source, if you’re keeping it in another system.
    POST _reindex
    {
      "source": {
        "index": "myindex",
        "type": "type1"  
      },
      "dest": {
        "index": "myindex_type1",
        "type": "_doc"  
      }
    }
    POST _reindex
    {
      "source": {
        "index": "myindex",
        "type": "type2"  
      },
      "dest": {
        "index": "myindex_type2",
        "type": "_doc"  
      }
    }

  3. If your application was previously querying the original index with multiple types, you’ll need to update your queries to specify the new indexes with _doc as the type. For example, if your client was querying using myindex, which has been reindexed to myindex_type1 and myindex_type2, then change your clients to point to myindex*, which will query across both indexes.
  4. After you have verified that the data is successfully reindexed and your application is working as expected with the new indexes, you should delete the original index before starting the upgrade, because it won’t be supported in the new version. Be cautious when deleting data and make sure you have backups if necessary.
  5. As part of this upgrade, Kibana will be replaced with OpenSearch Dashboards. When you’re done with the upgrade in the next step, you should advocate your users to use the new endpoint, which will be _dashboards. If you use a custom endpoint, be sure to update it to point to /_dashboards.
  6. We recommend that you update your AWS Identity and Access Management (IAM) policies to use the renamed API operations. However, OpenSearch Service will continue to respect existing policies by internally replicating the old API permissions. Service control policies (SCPs) introduce an additional layer of complexity compared to standard IAM. To prevent your SCP policies from breaking, you need to add both the old and the new API operations to each of your SCP policies.

Start the upgrade

The upgrade process is irreversible and can’t be paused or cancelled. You should make sure that everything will go smoothly by running a proof of concept (POC) check. By building a POC, you ensure data preservation, avoid compatibility issues, prevent unwanted bugs, and mitigate risks. You can run a POC with a small domain on the new version, and with a small subset of your data. Deploy and run any front-end code that communicates with OpenSearch Service via API against your test domain. Use your ingest pipeline to send data to your test domain as well, ensuring nothing breaks. Import or rebuild dashboards, alerts, anomaly detectors, and so on. This simple precaution can make your upgrade experience smoother and trouble-free while minimizing potential disruptions.

When OpenSearch Service starts the upgrade, it can take from 15 minutes to several hours to complete. OpenSearch Dashboards might be unavailable during some or all of the duration of the upgrade.

In this section, we show how you can start an upgrade using the AWS Management Console. You can also run the upgrade using the AWS Command Line Interface (AWS CLI) or AWS SDK. For more information, refer to Starting an upgrade (CLI) and Starting an upgrade (SDK).

  1. Take a manual snapshot of your domain.

This snapshot serves as a backup that you can restore on a new domain if you want to return to using the prior version.

  1. On the OpenSearch Service console, choose the domain that you want to upgrade.
  2. On the Actions menu, choose Upgrade.
  3. Select the target version as OpenSearch 1.3. If you’re upgrading to an OpenSearch version, then you’ll be able to enable compatibility mode. If you enable this setting, OpenSearch reports its version as 7.10 to allow Elastic’s open-source clients and plugins like Logstash to continue working with your OpenSearch Service domain.
  4. Choose Check Upgrade Eligibility, which helps you identify if there are any breaking changes you still need to fix before running an upgrade.
  5. Choose Upgrade.
  6. Check the status on the domain dashboard to monitor the status of the upgrade.

The following graphic gives a quick demonstration of running and monitoring the upgrade via the preceding steps.

Post-upgrade changes and validations

Now that you have successfully upgraded your domain to OpenSearch Service 1.3, be sure to make the following changes:

  • Custom endpoint – A custom endpoint for your OpenSearch Service domain makes it straightforward for you to refer to your OpenSearch and OpenSearch Dashboards URLs. You can include your company’s branding or just use a shorter, easier-to-remember endpoint than the standard one. In OpenSearch Service, Kibana has been renamed to dashboards. After you upgrade a domain from the Elasticsearch engine to the OpenSearch engine, the /_plugin/kibana endpoint changes to /_dashboards. If you use the Kibana endpoint to set up a custom domain, you need to update it to include the new /_dashboards endpoint.
  • SAML authentication for OpenSearch Dashboards – After you upgrade your domain to OpenSearch Service, you need to change all Kibana URLs configured in your identity provider (IdP) from /_plugin/kibana to /_dashboards. The most common URLs are assertion consumer service (ACS) URLs and recipient URLs.

Conclusion

This post discussed what to consider when planning to upgrade your existing Elasticsearch engine in your OpenSearch Service domain to the OpenSearch engine. OpenSearch Service continues to support older engine versions, including open-source versions of Elasticsearch from 1.x through 7.10. By migrating forward to OpenSearch Service, you will be able to take advantage of bug fixes, performance improvements, new service features, and expanded instance types for your data nodes, like AWS Graviton 2 instances.

If you have feedback about this post, share it in the comments section. If you have questions about this post, start a new thread on the Amazon OpenSearch Service forum or contact AWS Support.


About the Authors

Prashant Agrawal is a Sr. Search Specialist Solutions Architect with Amazon OpenSearch Service. He works closely with customers to help them migrate their workloads to the cloud and helps existing customers fine-tune their clusters to achieve better performance and save on cost. Before joining AWS, he helped various customers use OpenSearch and Elasticsearch for their search and log analytics use cases. When not working, you can find him traveling and exploring new places. In short, he likes doing Eat → Travel → Repeat.

Harsh Bansal is a Solutions Architect at AWS with Analytics as his area of specialty.He has been building solutions to help organizations make data-driven decisions.

Enable cost-efficient operational analytics with Amazon OpenSearch Ingestion

Post Syndicated from Mikhail Vaynshteyn original https://aws.amazon.com/blogs/big-data/enable-cost-efficient-operational-analytics-with-amazon-opensearch-ingestion/

As the scale and complexity of microservices and distributed applications continues to expand, customers are seeking guidance for building cost-efficient infrastructure supporting operational analytics use cases. Operational analytics is a popular use case with Amazon OpenSearch Service. A few of the defining characteristics of these use cases are ingesting a high volume of time series data and a relatively low volume of querying, alerting, and running analytics on ingested data for real-time insights. Although OpenSearch Service is capable of ingesting petabytes of data across storage tiers, you still have to provision capacity to migrate between hot and warm tiers. This adds to the cost of provisioned OpenSearch Service domains.

The time series data often contains logs or telemetry data from various sources with different values and needs. That is, logs from some sources need to be available in a hot storage tier longer, whereas logs from other sources can tolerate a delay in querying and other requirements. Until now, customers were building external ingestion systems with the Amazon Kinesis family of services, Amazon Simple Queue Service (Amazon SQS), AWS Lambda, custom code, and other similar solutions. Although these solutions enable ingestion of operational data with various requirements, they add to the cost of ingestion.

In general, operational analytics workloads use anomaly detection to aid domain operations. This assumes that the data is already present in OpenSearch Service and the cost of ingestion is already borne.

With the addition of a few recent features of Amazon OpenSearch Ingestion, a fully managed serverless pipeline for OpenSearch Service, you can effectively address each of these cost points and build a cost-effective solution. In this post, we outline a solution that does the following:

  • Uses conditional routing of Amazon OpenSearch Ingestion to separate logs with specific attributes and store those, for example, in Amazon OpenSearch Service and archive all events in Amazon S3 to query with Amazon Athena
  • Uses in-stream anomaly detection with OpenSearch Ingestion, thereby removing the cost associated with compute needed for anomaly detection after ingestion

In this post, we use a VPC flow logs use case to demonstrate the solution. The solution and pattern presented in this post is equally applicable to larger operational analytics and observability use cases.

Solution overview

We use VPC flow logs to capture IP traffic and trigger processing notifications to the OpenSearch Ingestion pipeline. The pipeline filters the data, routes the data, and detects anomalies. The raw data will be stored in Amazon S3 for archival purposes, then the pipeline will detect anomalies in the data in near-real time using the Random Cut Forest (RCF) algorithm and send those data records to OpenSearch Service. The raw data stored in Amazon S3 can be inexpensively retained for an extended period of time using tiered storage and queried using the Athena query engine, and also visualized using Amazon QuickSight or other data visualization services. Although this walkthrough uses VPC flow log data, the same pattern applies for use with AWS CloudTrail, Amazon CloudWatch, any log files as well as any OpenTelemetry events, and custom producers.

The following is a diagram of the solution that we configure in this post.

In the following sections, we provide a walkthrough for configuring this solution.

The patterns and procedures presented in this post have been validated with the current version of OpenSearch Ingestion and the Data Prepper open-source project version 2.4.

Prerequisites

Complete the following prerequisite steps:

  1. We will be using a VPC for demonstration purposes for generating data. Set up the VPC flow logs to publish logs to an S3 bucket in text format. To optimize S3 storage costs, create a lifecycle configuration on the S3 bucket to transition the VPC flow logs to different tiers or expire processed logs. Make a note of the S3 bucket name you configured to use in later steps.
  2. Set up an OpenSearch Service domain. Make a note of the domain URL. The domain can be either public or VPC based, which is the preferred configuration.
  3. Create an S3 bucket for storing archived events, and make a note of S3 bucket name. Configure a resource-based policy allowing OpenSearch Ingestion to archive logs and Athena to read the logs.
  4. Configure an AWS Identity and Access Management (IAM) role or separate IAM roles allowing OpenSearch Ingestion to interact with Amazon SQS and Amazon S3. For instructions, refer to Configure the pipeline role.
  5. Configure Athena or validate that Athena is configured on your account. For instructions, refer to Getting started.

Configure an SQS notification

VPC flow logs will write data in Amazon S3. After each file is written, Amazon S3 will send an SQS notification to notify the OpenSearch Ingestion pipeline that the file is ready for processing.

If the data is already stored in Amazon S3, you can use the S3 scan capability for a one-time or scheduled loading of data through the OpenSearch Ingestion pipeline.

Use AWS CloudShell to issue the following commands to create the SQS queues VpcFlowLogsNotifications and VpcFlowLogsNotifications-DLQ that we use for this walkthrough.

Create a dead-letter queue with the following code

export SQS_DLQ_URL=$(aws sqs create-queue --queue-name VpcFlowLogsNotifications-DLQ | jq -r '.QueueUrl')

echo $SQS_DLQ_URL 

export SQS_DLQ_ARN=$(aws sqs get-queue-attributes --queue-url $SQS_DLQ_URL --attribute-names QueueArn | jq -r '.Attributes.QueueArn') 

echo $SQS_DLQ_ARN

Create an SQS queue to receive events from Amazon S3 with the following code:

export SQS_URL=$(aws sqs create-queue --queue-name VpcFlowLogsNotification --attributes '{
"RedrivePolicy": 
"{\"deadLetterTargetArn\":\"'$SQS_DLQ_ARN'\",\"maxReceiveCount\":\"2\"}", 
"Policy": 
  "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"s3.amazonaws.com\"}, \"Action\":\"SQS:SendMessage\",\"Resource\":\"*\"}]}" 
}' | jq -r '.QueueUrl')

echo $SQS_URL

To configure the S3 bucket to send events to the SQS queue, use the following code (provide the name of your S3 bucket used for storing VPC flow logs):

aws s3api put-bucket-notification-configuration --bucket __BUCKET_NAME__ --notification-configuration '{
     "QueueConfigurations": [
         {
             "QueueArn": "'$SQS_URL'",
             "Events": [
                 "s3:ObjectCreated:*"
             ]
         }
     ]
}'

Create the OpenSearch Ingestion pipeline

Now that you have configured Amazon SQS and the S3 bucket notifications, you can configure the OpenSearch Ingestion pipeline.

  1. On the OpenSearch Service console, choose Pipelines under Ingestion in the navigation pane.
  2. Choose Create pipeline.

  1. For Pipeline name, enter a name (for this post, we use stream-analytics-pipeline).
  2. For Pipeline configuration, enter the following code:
version: "2"
entry-pipeline:
  source:
     s3:
       notification_type: sqs
       compression: gzip
       codec:
         newline:
       sqs:
         queue_url: "<strong>__SQS_QUEUE_URL__</strong>"
         visibility_timeout: 180s
       aws:
        region: "<strong>__REGION__</strong>"
        sts_role_arn: "<strong>__STS_ROLE_ARN__</strong>"
  
  processor:
  sink:
    - pipeline:
        name: "archive-pipeline"
    - pipeline:
        name: "data-processing-pipeline"

data-processing-pipeline:
    source: 
        pipeline:
            name: "entry-pipeline"
    processor:
    - grok:
        tags_on_match_failure: [ "grok_match_failure" ]
        match:
          message: [ "%{VPC_FLOW_LOG}" ]
    route:
        - icmp_traffic: '/protocol == 1'
    sink:
        - pipeline:
            name : "icmp-pipeline"
            routes:
                - "icmp_traffic"
        - pipeline:
            name: "analytics-pipeline"
    

archive-pipeline:
  source:
    pipeline:
      name: entry-pipeline
  processor:
  sink:
    - s3:
        aws:
          region: "<strong>__REGION__</strong>"
          sts_role_arn: "<strong>__STS_ROLE_ARN__</strong>"
        max_retries: 16
        bucket: "<strong>__AWS_S3_BUCKET_ARCHIVE__</strong>"
        object_key:
          path_prefix: "vpc-flow-logs-archive/year=%{yyyy}/month=%{MM}/day=%{dd}/"
        threshold:
          maximum_size: 50mb
          event_collect_timeout: 300s
        codec:
          parquet:
            auto_schema: true
      
analytics-pipeline:
  source:
    pipeline:
      name: "data-processing-pipeline"
  processor:
    - drop_events:
        drop_when: "hasTags(\"grok_match_failure\") or \"/log-status\" == \"NODATA\""
    - date:
        from_time_received: true
        destination: "@timestamp"
    - aggregate:
        identification_keys: ["srcaddr", "dstaddr"]
        action:
          tail_sampler:
            percent: 20.0
            wait_period: "60s"
            condition: '/action != "ACCEPT"'
    - anomaly_detector:
        identification_keys: ["srcaddr","dstaddr"]
        keys: ["bytes"]
        verbose: true
        mode:
          random_cut_forest:
  sink:
    - opensearch:
        hosts: [ "<strong>__AMAZON_OPENSEARCH_DOMAIN_URL__</strong>" ]
        index: "flow-logs-anomalies"
        aws:
          sts_role_arn: "<strong>__STS_ROLE_ARN__</strong>"
          region: "<strong>__REGION__</strong>"
          
icmp-pipeline:
  source:
    pipeline:
      name: "data-processing-pipeline"
  processor:
  sink:
    - opensearch:
        hosts: [ "<strong>__AMAZON_OPENSEARCH_DOMAIN_URL__</strong>" ]
        index: "sensitive-icmp-traffic"
        aws:
          sts_role_arn: "<strong>__STS_ROLE_ARN__</strong>"
          region: "<strong>__REGION__</strong>"</code>

Replace the variables in the preceding code with resources in your account:

    • __SQS_QUEUE_URL__ – URL of Amazon SQS for Amazon S3 events
    • __STS_ROLE_ARN__AWS Security Token Service (AWS STS) roles for resources to assume
    • __AWS_S3_BUCKET_ARCHIVE__ – S3 bucket for archiving processed events
    • __AMAZON_OPENSEARCH_DOMAIN_URL__ – URL of OpenSearch Service domain
    • __REGION__ – Region (for example, us-east-1)
  1. In the Network settings section, specify your network access. For this walkthrough, we are using VPC access. We provided the VPC and private subnet locations that have connectivity with the OpenSearch Service domain and security groups.
  2. Leave the other settings with default values, and choose Next.
  3. Review the configuration changes and choose Create pipeline.

It will take a few minutes for OpenSearch Service to provision the environment. While the environment is being provisioned, we’ll walk you through the pipeline configuration. Entry-pipeline listens for SQS notifications about newly arrived files and triggers the reading of VPC flow log compressed files:

…
entry-pipeline:
  source:
     s3:
…

The pipeline branches into two sub-pipelines. The first stores original messages for archival purposes in Amazon S3 in read-optimized Parquet format; the other applies analytics routes events to the OpenSearch Service domain for fast querying and alerting:

…
  sink:
    - pipeline:
        name: "archive-pipeline"
    - pipeline:
        name: "data-processing-pipeline"
… 

The pipeline archive-pipeline aggregates messages in 50 MB chunks or every 60 seconds and writes a Parquet file to Amazon S3 with the schema inferred from the message. Also, a prefix is added to help with partitioning and query optimization when reading a collection of files using Athena.

…
sink:
    - s3:
…
        object_key:
          path_prefix: " vpc-flow-logs-archive/year=%{yyyy}/month=%{MM}/day=%{dd}/"
        threshold:
          maximum_size: 50mb
          event_collect_timeout: 300s
        codec:
          parquet:
            auto_schema: true
…

Now that we have reviewed the basics, we focus on the pipeline that detects anomalies and sends only high-value messages that deviate from the norm to OpenSearch Service. It also stores Internet Control Message Protocols (ICMP) messages in OpenSearch Service.

We applied a grok processor to parse the message using a predefined regex for parsing VPC flow logs, and also tagged all unparsable messages with the grok_match_failure tag, which we use to remove headers and other records that can’t be parsed:

…
    processor:
    - grok:
        tags_on_match_failure: [ "grok_match_failure" ]
        match:
          message: [ "%{VPC_FLOW_LOG}" ]
…

We then routed all messages with the protocol identifier 1 (ICMP) to icmp-pipeline and all messages to analytics-pipeline for anomaly detection:

…
   route:
        - icmp_traffic: '/protocol == 1'
    sink:
        - pipeline:
            name : "icmp-pipeline"
            routes:
                - "icmp_traffic"
        - pipeline:
            name: "analytics-pipeline"
…

In the analytics pipeline, we dropped all records that can’t be parsed using the hasTags method based on the tag that we assigned at the time of parsing. We also removed all records that don’t contains useful data for anomaly detection.

…
  - drop_events:
        drop_when: "hasTags(\"grok_match_failure\") or \"/log-status\" == \"NODATA\""		
…

Then we applied probabilistic sampling using the tail_sampler processor for all accepted messages grouped by source and destination addresses and sent those to the sink with all messages that were not accepted. This helps reduce the volume of messages within the selected cardinality keys, with a focus on all messages that weren’t accepted, and keeps a sample representation of messages that were accepted.

…
aggregate:
        identification_keys: ["srcaddr", "dstaddr"]
        action:
          tail_sampler:
            percent: 20.0
            wait_period: "60s"
            condition: '/action != "ACCEPT"'
…

Then we used the anomaly detector processor to identify anomalies within the cardinality key pairs or source and destination addresses in our example. The anomaly detector processor creates and trains RCF models for a hashed value of keys, then uses those models to determine whether newly arriving messages have an anomaly based on the trained data. In our demonstration, we use bytes data to detect anomalies:

…
anomaly_detector:
        identification_keys: ["srcaddr","dstaddr"]
        keys: ["bytes"]
        verbose: true
        mode:
          random_cut_forest:
…

We set verbose:true to instruct the detector to emit the message every time an anomaly is detected. Also, for this walkthrough, we used a non-default sample_size for training the model.

When anomalies are detected, the anomaly detector returns a complete record and adds

"deviation_from_expected":value,"grade":value attributes that signify the deviation value and severity of the anomaly. These values can be used to determine routing of such messages to OpenSearch Service, and use per-document monitoring capabilities in OpenSearch Service to alert on specific conditions.

Currently, OpenSearch Ingestion creates up to 5,000 distinct models based on cardinality key values per compute unit. This limit is observed using the anomaly_detector.RCFInstances.value CloudWatch metric. It’s important to select a cardinality key-value pair to avoid exceeding this constraint. As development of the Data Prepper open-source project and OpenSearch Ingestion continues, more configuration options will be added to offer greater flexibility around model training and memory management.

The OpenSearch Ingestion pipeline exposes the anomaly_detector.cardinalityOverflow.count metric through CloudWatch. This metric indicates a number of key value pairs that weren’t run by the anomaly detection processor during a period of time as the maximum number of RCFInstances per compute unit was reached. To avoid this constraint, a number of compute units can be scaled out to provide additional capacity for hosting additional instances of RCFInstances.

In the last sink, the pipeline writes records with detected anomalies along with deviation_from_expected and grade attributes to the OpenSearch Service domain:

…
sink:
    - opensearch:
        hosts: [ "__AMAZON_OPENSEARCH_DOMAIN_URL__" ]
        index: "anomalies"
…

Because only anomaly records are being routed and written to the OpenSearch Service domain, we are able to significantly reduce the size of our domain and optimize the cost of our sample observability infrastructure.

Another sink was used for storing all ICMP records in a separate index in the OpenSearch Service domain:

…
sink:
    - opensearch:
        hosts: [ "__AMAZON_OPENSEARCH_DOMAIN_URL__" ]
        index: " sensitive-icmp-traffic"
…

Query archived data from Amazon S3 using Athena

In this section, we review the configuration of Athena for querying archived events data stored in Amazon S3. Complete the following steps:

  1. Navigate to the Athena query editor and create a new database called vpc-flow-logs-archive-database using the following command:
CREATE DATABASE `vpc-flow-logs-archive`
  1. 2. On the Database menu, choose vpc-flow-logs-archive.
  2. In the query editor, enter the following command to create a table (provide the S3 bucket used for archiving processed events). For simplicity, for this walkthrough, we create a table without partitions.
CREATE EXTERNAL TABLE `vpc-flow-logs-data`(
  `message` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://__AWS_S3_BUCKET_ARCHIVE__'
TBLPROPERTIES (
  'classification'='parquet', 
  'compressionType'='none'
)
  1. Run the following query to validate that you can query the archived VPC flow log data:
SELECT * FROM "vpc-flow-logs-archive"."vpc-flow-logs-data" LIMIT 10;

Because archived data is stored in its original format, it helps avoid issues related to format conversion. Athena will query and display records in the original format. However, it’s ideal to interact only with a subset of columns or parts of the messages. You can use the regexp_split function in Athena to split the message in the columns and retrieve certain columns. Run the following query to see the source and destination address groupings from the VPC flow log data:

SELECT srcaddr, dstaddr FROM (
   SELECT regexp_split(message, ' ')[4] AS srcaddr, 
          regexp_split(message, ' ')[5] AS dstaddr, 
          regexp_split(message, ' ')[14] AS status  FROM "vpc-flow-logs-archive"."vpc-flow-logs-data" 
) WHERE status = 'OK' 
GROUP BY srcaddr, dstaddr 
ORDER BY srcaddr, dstaddr LIMIT 10;

This demonstrated that you can query all events using Athena, where archived data in its original raw format is used for the analysis. Athena is priced per data scanned. Because the data is stored in a read-optimized format and partitioned, it enables further cost-optimization around on-demand querying of archived streaming and observability data.

Clean up

To avoid incurring future charges, delete the following resources created as part of this post:

  • OpenSearch Service domain
  • OpenSearch Ingestion pipeline
  • SQS queues
  • VPC flow logs configuration
  • All data stored in Amazon S3

Conclusion

In this post, we demonstrated how to use OpenSearch Ingestion pipelines to build a cost-optimized infrastructure for log analytics and observability events. We used routing, filtering, aggregation, and anomaly detection in an OpenSearch Ingestion pipeline, enabling you to downsize your OpenSearch Service domain and create a cost-optimized observability infrastructure. For our example, we used a data sample with 1.5 million events with a pipeline distilling to 1,300 events with predicted anomalies based on source and destination IP pairs. This metric demonstrates that the pipeline identified that less than 0.1% of events were of high importance, and routed those to OpenSearch Service for visualization and alerting needs. This translates to lower resource utilization in OpenSearch Service domains and can lead to provisioning of smaller OpenSearch Service environments.

We encourage you to use OpenSearch Ingestion pipelines to create your purpose-built and cost-optimized observability infrastructure that uses OpenSearch Service for storing and alerting on high-value events. If you have comments or feedback, please leave them in the comments section.


About the Authors

Mikhail Vaynshteyn is a Solutions Architect with Amazon Web Services. Mikhail works with healthcare and life sciences customers to build solutions that help improve patients’ outcomes. Mikhail specializes in data analytics services.

Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search applications and solutions. Muthu is interested in the topics of networking and security, and is based out of Austin, Texas.

Unstructured data management and governance using AWS AI/ML and analytics services

Post Syndicated from Sakti Mishra original https://aws.amazon.com/blogs/big-data/unstructured-data-management-and-governance-using-aws-ai-ml-and-analytics-services/

Unstructured data is information that doesn’t conform to a predefined schema or isn’t organized according to a preset data model. Unstructured information may have a little or a lot of structure but in ways that are unexpected or inconsistent. Text, images, audio, and videos are common examples of unstructured data. Most companies produce and consume unstructured data such as documents, emails, web pages, engagement center phone calls, and social media. By some estimates, unstructured data can make up to 80–90% of all new enterprise data and is growing many times faster than structured data. After decades of digitizing everything in your enterprise, you may have an enormous amount of data, but with dormant value. However, with the help of AI and machine learning (ML), new software tools are now available to unearth the value of unstructured data.

In this post, we discuss how AWS can help you successfully address the challenges of extracting insights from unstructured data. We discuss various design patterns and architectures for extracting and cataloging valuable insights from unstructured data using AWS. Additionally, we show how to use AWS AI/ML services for analyzing unstructured data.

Why it’s challenging to process and manage unstructured data

Unstructured data makes up a large proportion of the data in the enterprise that can’t be stored in a traditional relational database management systems (RDBMS). Understanding the data, categorizing it, storing it, and extracting insights from it can be challenging. In addition, identifying incremental changes requires specialized patterns and detecting sensitive data and meeting compliance requirements calls for sophisticated functions. It can be difficult to integrate unstructured data with structured data from existing information systems. Some view structured and unstructured data as apples and oranges, instead of being complementary. But most important of all, the assumed dormant value in the unstructured data is a question mark, which can only be answered after these sophisticated techniques have been applied. Therefore, there is a need to being able to analyze and extract value from the data economically and flexibly.

Solution overview

Data and metadata discovery is one of the primary requirements in data analytics, where data consumers explore what data is available and in what format, and then consume or query it for analysis. If you can apply a schema on top of the dataset, then it’s straightforward to query because you can load the data into a database or impose a virtual table schema for querying. But in the case of unstructured data, metadata discovery is challenging because the raw data isn’t easily readable.

You can integrate different technologies or tools to build a solution. In this post, we explain how to integrate different AWS services to provide an end-to-end solution that includes data extraction, management, and governance.

The solution integrates data in three tiers. The first is the raw input data that gets ingested by source systems, the second is the output data that gets extracted from input data using AI, and the third is the metadata layer that maintains a relationship between them for data discovery.

The following is a high-level architecture of the solution we can build to process the unstructured data, assuming the input data is being ingested to the raw input object store.

Unstructured Data Management - Block Level Architecture Diagram

The steps of the workflow are as follows:

  1. Integrated AI services extract data from the unstructured data.
  2. These services write the output to a data lake.
  3. A metadata layer helps build the relationship between the raw data and AI extracted output. When the data and metadata are available for end-users, we can break the user access pattern into additional steps.
  4. In the metadata catalog discovery step, we can use query engines to access the metadata for discovery and apply filters as per our analytics needs. Then we move to the next stage of accessing the actual data extracted from the raw unstructured data.
  5. The end-user accesses the output of the AI services and uses the query engines to query the structured data available in the data lake. We can optionally integrate additional tools that help control access and provide governance.
  6. There might be scenarios where, after accessing the AI extracted output, the end-user wants to access the original raw object (such as media files) for further analysis. Additionally, we need to make sure we have access control policies so the end-user has access only to the respective raw data they want to access.

Now that we understand the high-level architecture, let’s discuss what AWS services we can integrate in each step of the architecture to provide an end-to-end solution.

The following diagram is the enhanced version of our solution architecture, where we have integrated AWS services.

Unstructured Data Management - AWS Native Architecture

Let’s understand how these AWS services are integrated in detail. We have divided the steps into two broad user flows: data processing and metadata enrichment (Steps 1–3) and end-users accessing the data and metadata with fine-grained access control (Steps 4–6).

  1. Various AI services (which we discuss in the next section) extract data from the unstructured datasets.
  2. The output is written to an Amazon Simple Storage Service (Amazon S3) bucket (labeled Extracted JSON in the preceding diagram). Optionally, we can restructure the input raw objects for better partitioning, which can help while implementing fine-grained access control on the raw input data (labeled as the Partitioned bucket in the diagram).
  3. After the initial data extraction phase, we can apply additional transformations to enrich the datasets using AWS Glue. We also build an additional metadata layer, which maintains a relationship between the raw S3 object path, the AI extracted output path, the optional enriched version S3 path, and any other metadata that will help the end-user discover the data.
  4. In the metadata catalog discovery step, we use the AWS Glue Data Catalog as the technical catalog, Amazon Athena and Amazon Redshift Spectrum as query engines, AWS Lake Formation for fine-grained access control, and Amazon DataZone for additional governance.
  5. The AI extracted output is expected to be available as a delimited file or in JSON format. We can create an AWS Glue Data Catalog table for querying using Athena or Redshift Spectrum. Like the previous step, we can use Lake Formation policies for fine-grained access control.
  6. Lastly, the end-user accesses the raw unstructured data available in Amazon S3 for further analysis. We have proposed integrating Amazon S3 Access Points for access control at this layer. We explain this in detail later in this post.

Now let’s expand the following parts of the architecture to understand the implementation better:

  • Using AWS AI services to process unstructured data
  • Using S3 Access Points to integrate access control on raw S3 unstructured data

Process unstructured data with AWS AI services

As we discussed earlier, unstructured data can come in a variety of formats, such as text, audio, video, and images, and each type of data requires a different approach for extracting metadata. AWS AI services are designed to extract metadata from different types of unstructured data. The following are the most commonly used services for unstructured data processing:

  • Amazon Comprehend – This natural language processing (NLP) service uses ML to extract metadata from text data. It can analyze text in multiple languages, detect entities, extract key phrases, determine sentiment, and more. With Amazon Comprehend, you can easily gain insights from large volumes of text data such as extracting product entity, customer name, and sentiment from social media posts.
  • Amazon Transcribe – This speech-to-text service uses ML to convert speech to text and extract metadata from audio data. It can recognize multiple speakers, transcribe conversations, identify keywords, and more. With Amazon Transcribe, you can convert unstructured data such as customer support recordings into text and further derive insights from it.
  • Amazon Rekognition – This image and video analysis service uses ML to extract metadata from visual data. It can recognize objects, people, faces, and text, detect inappropriate content, and more. With Amazon Rekognition, you can easily analyze images and videos to gain insights such as identifying entity type (human or other) and identifying if the person is a known celebrity in an image.
  • Amazon Textract – You can use this ML service to extract metadata from scanned documents and images. It can extract text, tables, and forms from images, PDFs, and scanned documents. With Amazon Textract, you can digitize documents and extract data such as customer name, product name, product price, and date from an invoice.
  • Amazon SageMaker – This service enables you to build and deploy custom ML models for a wide range of use cases, including extracting metadata from unstructured data. With SageMaker, you can build custom models that are tailored to your specific needs, which can be particularly useful for extracting metadata from unstructured data that requires a high degree of accuracy or domain-specific knowledge.
  • Amazon Bedrock – This fully managed service offers a choice of high-performing foundation models (FMs) from leading AI companies like AI21 Labs, Anthropic, Cohere, Meta, Stability AI, and Amazon with a single API. It also offers a broad set of capabilities to build generative AI applications, simplifying development while maintaining privacy and security.

With these specialized AI services, you can efficiently extract metadata from unstructured data and use it for further analysis and insights. It’s important to note that each service has its own strengths and limitations, and choosing the right service for your specific use case is critical for achieving accurate and reliable results.

AWS AI services are available via various APIs, which enables you to integrate AI capabilities into your applications and workflows. AWS Step Functions is a serverless workflow service that allows you to coordinate and orchestrate multiple AWS services, including AI services, into a single workflow. This can be particularly useful when you need to process large amounts of unstructured data and perform multiple AI-related tasks, such as text analysis, image recognition, and NLP.

With Step Functions and AWS Lambda functions, you can create sophisticated workflows that include AI services and other AWS services. For instance, you can use Amazon S3 to store input data, invoke a Lambda function to trigger an Amazon Transcribe job to transcribe an audio file, and use the output to trigger an Amazon Comprehend analysis job to generate sentiment metadata for the transcribed text. This enables you to create complex, multi-step workflows that are straightforward to manage, scalable, and cost-effective.

The following is an example architecture that shows how Step Functions can help invoke AWS AI services using Lambda functions.

AWS AI Services - Lambda Event Workflow -Unstructured Data

The workflow steps are as follows:

  1. Unstructured data, such as text files, audio files, and video files, are ingested into the S3 raw bucket.
  2. A Lambda function is triggered to read the data from the S3 bucket and call Step Functions to orchestrate the workflow required to extract the metadata.
  3. The Step Functions workflow checks the type of file, calls the corresponding AWS AI service APIs, checks the job status, and performs any postprocessing required on the output.
  4. AWS AI services can be accessed via APIs and invoked as batch jobs. To extract metadata from different types of unstructured data, you can use multiple AI services in sequence, with each service processing the corresponding file type.
  5. After the Step Functions workflow completes the metadata extraction process and performs any required postprocessing, the resulting output is stored in an S3 bucket for cataloging.

Next, let’s understand how can we implement security or access control on both the extracted output as well as the raw input objects.

Implement access control on raw and processed data in Amazon S3

We just consider access controls for three types of data when managing unstructured data: the AI-extracted semi-structured output, the metadata, and the raw unstructured original files. When it comes to AI extracted output, it’s in JSON format and can be restricted via Lake Formation and Amazon DataZone. We recommend keeping the metadata (information that captures which unstructured datasets are already processed by the pipeline and available for analysis) open to your organization, which will enable metadata discovery across the organization.

To control access of raw unstructured data, you can integrate S3 Access Points and explore additional support in the future as AWS services evolve. S3 Access Points simplify data access for any AWS service or customer application that stores data in Amazon S3. Access points are named network endpoints that are attached to buckets that you can use to perform S3 object operations. Each access point has distinct permissions and network controls that Amazon S3 applies for any request that is made through that access point. Each access point enforces a customized access point policy that works in conjunction with the bucket policy that is attached to the underlying bucket. With S3 Access Points, you can create unique access control policies for each access point to easily control access to specific datasets within an S3 bucket. This works well in multi-tenant or shared bucket scenarios where users or teams are assigned to unique prefixes within one S3 bucket.

An access point can support a single user or application, or groups of users or applications within and across accounts, allowing separate management of each access point. Every access point is associated with a single bucket and contains a network origin control and a Block Public Access control. For example, you can create an access point with a network origin control that only permits storage access from your virtual private cloud (VPC), a logically isolated section of the AWS Cloud. You can also create an access point with the access point policy configured to only allow access to objects with a defined prefix or to objects with specific tags. You can also configure custom Block Public Access settings for each access point.

The following architecture provides an overview of how an end-user can get access to specific S3 objects by assuming a specific AWS Identity and Access Management (IAM) role. If you have a large number of S3 objects to control access, consider grouping the S3 objects, assigning them tags, and then defining access control by tags.

S3 Access Points - Unstructured Data Management - Access Control

If you are implementing a solution that integrates S3 data available in multiple AWS accounts, you can take advantage of cross-account support for S3 Access Points.

Conclusion

This post explained how you can use AWS AI services to extract readable data from unstructured datasets, build a metadata layer on top of them to allow data discovery, and build an access control mechanism on top of the raw S3 objects and extracted data using Lake Formation, Amazon DataZone, and S3 Access Points.

In addition to AWS AI services, you can also integrate large language models with vector databases to enable semantic or similarity search on top of unstructured datasets. To learn more about how to enable semantic search on unstructured data by integrating Amazon OpenSearch Service as a vector database, refer to Try semantic search with the Amazon OpenSearch Service vector engine.

As of writing this post, S3 Access Points is one of the best solutions to implement access control on raw S3 objects using tagging, but as AWS service features evolve in the future, you can explore alternative options as well.


About the Authors

Sakti Mishra is a Principal Solutions Architect at AWS, where he helps customers modernize their data architecture and define their end-to-end data strategy, including data security, accessibility, governance, and more. He is also the author of the book Simplify Big Data Analytics with Amazon EMR. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places with family.

Bhavana Chirumamilla is a Senior Resident Architect at AWS with a strong passion for data and machine learning operations. She brings a wealth of experience and enthusiasm to help enterprises build effective data and ML strategies. In her spare time, Bhavana enjoys spending time with her family and engaging in various activities such as traveling, hiking, gardening, and watching documentaries.

Sheela Sonone is a Senior Resident Architect at AWS. She helps AWS customers make informed choices and trade-offs about accelerating their data, analytics, and AI/ML workloads and implementations. In her spare time, she enjoys spending time with her family—usually on tennis courts.

Daniel Bruno is a Principal Resident Architect at AWS. He had been building analytics and machine learning solutions for over 20 years and splits his time helping customers build data science programs and designing impactful ML products.

Simplify Amazon Redshift monitoring using the new unified SYS views

Post Syndicated from Urvish Shah original https://aws.amazon.com/blogs/big-data/simplify-amazon-redshift-monitoring-using-the-new-unified-sys-views/

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud, providing up to five times better price-performance than any other cloud data warehouse, with performance innovation out of the box at no additional cost to you. Tens of thousands of customers use Amazon Redshift to process exabytes of data every day to power their analytics workloads.

In this post, we discuss Amazon Redshift SYS monitoring views and how they simplify the monitoring of your Amazon Redshift workloads and resource usage.

Overview of SYS monitoring views

SYS monitoring views are system views in Amazon Redshift that can be used to monitor query and workload resource usage for provisioned clusters as well as for serverless workgroups. They offer the following benefits:

  • They’re categorized based on functional alignment, considering query state, performance metrics, and query types
  • We have introduced new performance metrics like planning_time, lock_wait_time, remote_read_io, and local_read_io to aid in performance troubleshooting
  • It improves the usability of monitoring views by logging the user-submitted query instead of the Redshift optimizer-rewritten query
  • It provides more troubleshooting metrics using fewer views
  • It enables unified Amazon Redshift monitoring by enabling you to use the same query across provisioned clusters or serverless workgroups

Let’s look at some of the features of SYS monitoring views and how they can be used for monitoring.

Unify various query-level monitoring metrics

The following table shows how you can unify various metrics and information for a query from multiple system tables & views into one SYS monitoring view.

STL/SVL/STV Information element SYS Monitoring View View columns
STL_QUERY elapsed time, query label, user ID, transaction, session, label, stopped queries, database name SYS_QUERY_HISTORY

user_id

query_id

query_label

transaction_id

session_id

database_name

query_type

status

result_cache_hit

start_time

end_time

elapsed_time

queue_time

execution_time

error_message

returned_rows

returned_bytes

query_text

redshift_version

usage_limit

compute_type

compile_time

planning_time

lock_wait_time

STL_WLM_QUERY queue time, runtime
SVL_QLOG result cache
STL_ERROR error code, error message
STL_UTILITYTEXT non-SELECT SQL
STL_DDLTEXT DDL statements
SVL_STATEMENTEXT all types of SQL statements
STL_RETURN return rows and bytes
STL_USAGE_CONTROL usage limit
STV_WLM_QUERY_STATE current state of WLM
STV_RECENTS recent and in-flight queries
STV_INFLIGHT in-flight queries
SVL_COMPILE compilation

For additional information on SYS to STL/SVL/STV mapping, refer to Migrating to SYS monitoring views.

User query-level logging

To enhance query performance, the Redshift query engine can rewrite user-submitted queries. The user-submitted query identifier is different than the rewritten query identifier. We refer to the user-submitted query as the parent query and the rewritten query as the child query in this post.

The following diagram illustrates logging at the parent query level and child query level. The parent query identifier is 1000, and the child query identifiers are 1001, 1002, and 1003.

Query lifecycle timings

SYS_QUERY_HISTORY has an enhanced list of columns to provide granular time metrics relating to the different query lifecycle phases. Note all times are recorded in microseconds. The following table summarizes these metrics.

Time metrics Description
planning_time The time the query spent prior to running the query, which typically includes query lifecycle phases like parse, analyze, planning and rewriting.
lock_wait_time The time the query spent on acquiring the locks on the required database objects referenced.
queue_time The time the query spent in the queue waiting for resources to be available to run.
compile_time The time the query spent compiling.
execution_time The time the query spent running. In the case of a SELECT query, this also includes the return time.
elapsed_time The end-to-end time of the query run.

Solution overview

We discuss the following scenarios to help gain familiarity with the SYS monitoring views:

  • Workload and query lifecycle monitoring
  • Data ingestion monitoring
  • External query monitoring
  • Slow query performance troubleshooting

Prerequisites

You should have the following prerequisites to follow along with the examples in this post:

Additionally, download all the SQL queries that are referenced in this post as Redshift Query Editor v2 SQL notebooks.

Workload and query lifecycle monitoring

In this section, we discuss how to monitor the workload and query lifecycle.

Identify in-flight queries

SYS_QUERY_HISTORY provides a singular view to look at all the in-flight queries as well as historical runs. See the following example query:

SELECT  
  *
FROM    
  sys_query_history
WHERE    status IN ('planning', 'queued', 'running', 'returning')
ORDER BY
  start_time;

We get the following output.

Identify top long-running queries

The following query helps retrieve the top 100 queries that are taking the longest to run. Analyzing (and, if feasible, optimizing) these queries can help improve overall performance. These metrics are accumulated statistics across all runs of the query. Note that all the time values are in microseconds.

--top long running query by elapsed_time
SELECT  
  user_id
  , transaction_id
  , query_id
  , database_name
  , query_type
  , query_text::VARCHAR(100)
  , lock_wait_time
  , planning_time
  , compile_time
  , execution_time
  , elapsed_time
FROM    
  sys_query_history
ORDER BY
  elapsed_time DESC
LIMIT 100;

We get the following output.

Gather daily counts of queries by query types, period, and status

The following query provides insight into the distribution of different types of queries across different days and helps evaluate and track any changes in the workload:

--daily breakdown of workload by query types and status
SELECT  
  DATE_TRUNC('day', start_time) period_daily
  , query_type
  , status
  , COUNT(*)
FROM    
  sys_query_history
GROUP BY
  period_daily
  , query_type
  , status
ORDER BY
  period_daily
  , query_type
  , status;

We get the following output.

Gather run details of an in-flight query

To determine the run-level details of a query that is in-flight, you can use the is_active = ‘t’ filter when querying the SYS_QUERY_DETAIL table. See the following example:

SELECT  
  query_id
  , child_query_sequence
  , stream_id
  , segment_id
  , step_id
  , step_name
  , table_id
  , coalesce(table_name,'')|| coalesce(source,'') as table_name
  , start_time
  , end_time
  , duration
  , blocks_read
  , local_read_io
  , remote_read_io
FROM    
  sys_query_detail
WHERE is_active = 't'
ORDER BY
  query_id
  , child_query_sequence
  , stream_id
  , segment_id
  , step_id;

To view the latest 100 COPY queries run, use the following code:

SELECT  
  session_id
  , transaction_id
  , query_id
  , database_name
  , table_name
  , data_source
  , loaded_rows
  , loaded_bytes
  , duration / 1000.00 duration_ms
FROM    
  sys_load_history
ORDER BY
  start_time DESC LIMIT 100;

We get the following output.

Gather transaction-level details for commits and undo

SYS_TRANSACTION_HISTORY provides transaction-level logging by providing insights into committed transactions with details like blocks committed, status, and isolation level (serializable or snapshot used). It also logs details about the rolled back or undo transactions.

The following screenshots illustrate fetching details about a transaction that was committed successfully.

The following screenshots illustrate fetching details about a transaction that was rolled back.

Stats and vacuum

The SYS_ANALYZE_HISTORY monitoring view provides details like the last timestamp of analyze queries, the duration for which a particular analyze query ran, the number of rows in the table, and the number of rows modified. The following example query provides a list of the latest analyze queries that ran for all the permanent tables:

SELECT  
  TRIM(schema_name) schema_name
  , TRIM(table_name) table_name
  , table_id
  , status
  , COUNT(*) times_analyze_was_triggered
  , MAX(last_analyze_time) last_analyze_time
  , MAX(end_time) end_time
  , AVG(ROWS) "rows"
  , AVG(modified_rows) modified_rows
FROM    
  sys_analyze_history
WHERE
   status != 'Skipped'
GROUP BY
  schema_name
  , table_name
  , table_id
  , status
ORDER BY
  schema_name
  , table_name
  , table_id
  , status
  , end_time;

We get the following output.

The SYS_VACUUM_HISTORY monitoring view provides a complete set of details on VACUUM in a single view. For example, see the following code:

SELECT  
  user_id
  , transaction_id
  , query_id
  , TRIM(database_name) as database_name
  , TRIM(schema_name) as schema_name
  , TRIM(table_name) table_name
  , table_id
  , vacuum_type
  , is_automatic as is_auto
  , duration
  , rows_before_vacuum
  , size_before_vacuum
  , reclaimable_rows
  , reclaimed_rows
  , reclaimed_blocks
  , sortedrows_before_vacuum
  , sortedrows_after_vacuum
FROM    
  sys_vacuum_history
WHERE    status LIKE '%Finished%'
ORDER BY
  start_time;

We get the following output.

Data ingestion monitoring

In this section, we discuss how to monitor data ingestion.

Summary of ingestion

SYS_LOAD_HISTORY provides details into the statistics of COPY commands. Use this view for summarized insights into your ingestion workload. The following example query provides an hourly summary of ingestion broken down by tables in which data was ingested:

SELECT  
  date_trunc('hour', start_time) period_hourly
  , database_name
  , table_name
  , status
  , file_format
  , SUM(loaded_rows) total_rows_ingested
  , SUM(loaded_bytes) total_bytes_ingested
  , SUM(source_file_count) num_of_files_to_process
  , SUM(file_count_scanned) num_of_files_processed
  , SUM(error_count) total_errors
FROM    
  sys_load_history
GROUP BY
  period_hourly
  , database_name
  , table_name
  , status
  , file_format
ORDER BY
  table_name
  , period_hourly
  , status;

We get the following output.

File-level ingress logging

SYS_LOAD_DETAIL provides more granular insights into how ingestion is performed at the file level. For example, see the following query using sys_load_history:

SELECT  
  *
FROM    
  sys_load_history
WHERE table_name = 'catalog_sales'
ORDER BY
  start_time;

We get the following output.

The following example shows what detailed file-level monitoring looks like:

 SELECT  
  user_id
  , query_id
  , TRIM(file_name) file_name
  , bytes_scanned
  , lines_scanned
  , splits_scanned
  , record_time
  , start_time
  , end_time
FROM    
  sys_load_detail
WHERE query_id = 1824870
ORDER BY
  start_time;

Check for errors during ingress process

SYS_LOAD_ERROR_DETAIL enables you to track and troubleshoot errors that may have occurred during the ingestion process. This view logs details for the file that encountered the error during the ingestion process along with the line number at which the error occurred and column details within that line. See the following code:

select * from sys_load_error_detail order by start_time limit 100;

We get the following output.

External query monitoring

SYS_EXTERNAL_QUERY_DETAIL provides run details for external queries, which includes Amazon Redshift Spectrum and federated queries. This view logs details at the segment level and provides useful insights to troubleshoot and monitor performance of external queries in a single monitoring view. The following are a few useful metrics and data points this monitoring view provides:

  • Number of external files scanned (scanned_files) and format of external files (file_format) such as Parquet, text file, and so on
  • Data scanned in terms of rows (returned_rows) and bytes (returned_bytes)
  • Usage of partitioning (total_partitions and qualified_partitions) by external queries and tables
  • Granular insights into time taken in listing (s3list_time) and qualifying partitions (get_partition_time) for a given external object
  • External file location (file_location) and external table name (table_name)
  • Type of external source (source_type), such as Amazon Simple Storage Service (Amazon S3) for Redshift Spectrum, or federated
  • Recursive scan for subdirectories (is_recursive) or access of nested column data type (is_nested)

For example, the following query shows the daily summary of the number of external queries run and data scanned:

SELECT  
  DATE_TRUNC('hour', start_time) period_hourly
  , user_id
  , TRIM(source_type) source_type
  , COUNT (DISTINCT query_id) query_counts
  , SUM(returned_rows) returned_rows
  , ROUND(SUM(returned_bytes) / 1024^3,2) returned_gb
FROM    
  sys_external_query_detail
GROUP BY
  period_hourly
  , user_id
  , source_type
ORDER BY
  period_hourly
  , user_id
  , source_type;

We get the following output.

Usage of partitions

You can verify whether the external queries scanning large sums of data and files are partitioned or not. When you use partitions, you can restrict the amount of data that your external query has to scan by pruning based on the partition key. See the following code:

SELECT  
  file_location
  , CASE
      WHEN NVL(total_partitions,0) = 0
      THEN 'No'
      ELSE 'Yes'
    END is_partitioned
  , SUM(scanned_files) total_scanned_files
  , COUNT(DISTINCT query_id) query_count
FROM    
  sys_external_query_detail
GROUP BY
  file_location
  , is_partitioned
ORDER BY
  total_scanned_files DESC;

We get the following output.

For any errors encountered with external queries, look into SYS_EXTERNAL_QUERY_ERROR, which logs details at the granularity of file_location, column, and rowid within that file.

Slow query performance troubleshooting

Refer to the sysview_slow_query_performance_troubleshooting SQL notebook downloaded as part of the prerequisites for a step-by-step guide on how to perform query-level troubleshooting using SYS monitoring views and find answers to the following questions:

  • Do the queries being compared have similar query text?
  • Did the query use the result cache?
  • Which parts of the query lifecycle (queuing, compilation, planning, lock wait) are contributing the most to query runtimes?
  • Has the query plan changed?
  • Is the query reading more data blocks?
  • Is the query spilling to disk? If so, is it spilling to local or remote storage?
  • Is the query highly skewed with respect to data (distribution) and time (runtime)?
  • Do you see more rows processed in join steps or nested loops?
  • Are there any alerts indicating staleness in statistics?
  • When was the last vacuum and analyze performed for the tables involved in the query?

Clean up

If you created any Redshift provisioned clusters or Redshift Serverless workgroups as part of this post and no longer need them for your workloads, you can delete them to avoid incurring additional costs.

Conclusion

In this post, we explained how you can use the Redshift SYS monitoring views to monitor workloads of provisioned clusters and serverless workgroups. The SYS monitoring views provide simplified monitoring of the workloads, access to various query-level monitoring metrics from a unified view, and the ability to use the same SYS monitoring view query to run across both provisioned clusters and serverless workgroups. We also covered some key monitoring and troubleshooting scenarios using SYS monitoring views.

We encourage you to start using the new SYS monitoring views for your Redshift workloads. If you have any feedback or questions, please leave them in the comments.


About the authors

Urvish Shah is a Senior Database Engineer at Amazon Redshift. He has more than a decade of experience working on databases, data warehousing and in analytics space. Outside of work, he enjoys cooking, travelling and spending time with his daughter.

Ranjan Burman is a Analytics Specialist Solutions Architect at AWS. He specializes in Amazon Redshift and helps customers build scalable analytical solutions. He has more than 15 years of experience in different database and data warehousing technologies. He is passionate about automating and solving customer problems with the use of cloud solutions.

Build multi-layer maps in Amazon OpenSearch Service

Post Syndicated from Aish Gunasekar original https://aws.amazon.com/blogs/big-data/build-multi-layer-maps-in-amazon-opensearch-service/

With the release of Amazon OpenSearch Service 2.5, you can create maps with multiple layers to visualize your geographical data. You can build each layer from a different index pattern to separate data sources. Organizing the map in layers makes it more straightforward to visualize, view, and analyze geographical data. The layering also helps fetch data from various sources, view different data at different zoom levels, and continuously refresh data for a real-time dataset. Additionally, in OpenSearch 2.6, you can add multi-layer maps to dashboard panes within OpenSearch Dashboards, which makes it more straightforward to analyze your geospatial data in the context of other visualizations. OpenSearch Service provisions all the resources for your OpenSearch cluster so you can focus on building the business value through maps and other features of OpenSearch rather than spending time managing your deployment.

In this post, we show how to use multi-layer maps in OpenSearch Service.

Solution overview

The multi-layer map helps users visualize data and gain insights using specific layers and zoom levels to help emphasize key messages. For our use case, we use multi-layer maps to support an example real-estate application. Your index will have fields such as location, address, availability, number of bedrooms, price, and more. Initially, we develop a map with state boundaries with aggregated data for an overview. As the user zooms in, they’ll see city boundaries and postal codes. As the user continues to zoom, they’ll see each house with information like the price, number of bedrooms, and so on. You will build various data layers from different data sources to accomplish this task. You can also add filter queries; for example, to only show properties that are available.

Prerequisites

Complete the following prerequisite steps to configure the sample data:

  1. Create an OpenSearch Service domain (version 2.7 or higher).
  2. Download the file bulk_request_realestate.txt.
  3. Copy and paste the entire contents of the downloaded file into the OpenSearch Dashboards console.
  4. Run the commands.

These commands create the realestate index and upload records into the catalog.

Now let’s visualize this data in a multi-layer map.

Add layers

To create your map and add layers, complete the following steps:

  1. On OpenSearch Dashboards, in the navigation pane, under OpenSearch plugins, choose Maps.
  2. Choose Create map.

You will see a default map (or basemap) loaded on the page with a Layers pane on the left. This serves as a canvas for the data. The OpenSearch basemap utilizes vector tiles, resulting in quicker loading speeds and seamless zooming compared to raster tile maps. It effectively accommodates zoom levels ranging from 0–22. 0 is the most zoomed out with the global view, and zoom level 22 is roughly a half-inch per pixel resolution. Any additional layers you add will appear in this pane.

  1. Choose Add layer.
  2. In the prompt, choose the option to add documents for the data layer.
  3. Under Documents, choose the index you created (realestate) as the data source.
  4. For the geospatial field, choose the field containing geopoints (such as location in this example).
  5. Keep the remaining settings at their default values, then choose Update.

In the Layers pane, a newly generated layer named New Layer 2 will be visible. Additionally, you will observe the presence of all geopoints on the map (green dots in the following screenshot).

Update the layer name and enable tooltips

For an overview of the various configuration choices accessible for the layers, choose New Layer 2. This action opens another pane containing three tabs: Data, Style, and Settings. Let’s modify the layer’s name to something more relevant and enable tooltips. Complete the following steps:

  1. On the Settings tab, replace New Layer 2 with realestate-data in the Name field.
  2. On the Data tab, scroll down to Tool tips, and select Tool tips.
  3. Enter region as the tooltip.
  4. Choose Update.

The altered name should now be visible in the left pane. The geopoints themselves don’t convey any information. However, with tooltips enabled, you can access comprehensive information depending on the fields selected. As you hover over the geopoints, you will observe the chosen tooltip information—in this instance, region CA.

Adjust zoom levels

The essence of this functionality lies in the ability to observe your data through distinct layers at varying zoom levels. To achieve this, generate a layer using the same process as before. The following example shows a new layer (locality) featuring tooltips displaying locality and postal code. You can also choose the color of your geographical points on the Style tab. On the Settings tab, you’ll encounter options for zoom levels, allowing you to input minimum and maximum values—like 4 and 6, for instance. Consequently, this indicates that the layer will be visible exclusively within this range of zoom levels.

In the Layers pane, you can observe three layers alongside the locality layer created in the previous step. A notification will indicate “Layer is hidden outside of zoom range 1-2.” This layer becomes visible as you zoom in.

The realestate-data layer is set with a default zoom range of 0–22, ensuring visibility across all levels unless manually hidden. The locality layer is configured to be visible exclusively within the zoom range of 1–2.

As shown in the following screenshot, the tooltip for the realestate-data layer remains visible even after the fourth zoom level. To access the tooltip information for the locality layer, choose the eye icon next to realestate-data to manually conceal this layer. Once completed, hovering over the geopoints will reveal the tooltip details for locality (postal code and locality).

The following are some key points to consider:

  • Each layer can be established with distinct colors for its geographical points. For instance, the realestate-data layer is depicted in green, while the locality layer uses orange.
  • It’s possible to observe geopoints in a color that wasn’t directly chosen. In the following screenshot, brown is visible due to the overlapping of two layers at the same zoom level.
  • You can observe the color shift to the layer’s designated color—orange—after realestate-data is manually hidden, because there’s no longer an overlap between the layers.

You can generate an additional layer designed to display tooltip data such as the count of beds, baths, price, and square footage. This layer will be active within the zoom range of 3–4.

To save your project, choose Save. Enter a title, such as realestate-multilayer-map, then choose Save again. Your multilayer map is now successfully saved!

Exploring the multi-level map

After you have established all the layers, take note of how the layers become visible or invisible at each zoom level. Observe the dynamic adjustments in tooltip information that correspond to these changes as you zoom.

Add a filter

After you have generated multiple layers and successfully visualized your geopoints, you might find that you are interested in specific properties, such as within a particular price range.

To add a filter at layer level, complete the following steps:

  1. In the right pane, on the Data tab, choose Filters.
  2. Input price as the filter criteria.
  3. Select is between as the operator.
  4. Enter 800000 for the start of the range and 1400000 for the end of the range.
  5. Choose Save to update the layer.

You’ll immediately observe the filter taking effect, resulting in the display of only the relevant data matching the filter.

An alternative method to establish a filter involves drawing shapes on the map, such as rectangles or polygons. In this instance, you’ll be utilizing the polygon option. (For API-based filtering, refer to APIs).

  1. Choose the polygon icon on the right side of the map.
  2. For Filter label, enter a name for the filter.
  3. Draw the shape over the map area that you want to select.
  4. For a polygon, select any starting point on the map (this point becomes a polygon vertex) and hover (do not drag) to each subsequent vertex and select that point.
  5. Make sure to select the starting point again to close the polygon, as shown in the following screenshot.

Add a map to a dashboard

You can add this map to an existing or a new dashboard. Complete the following steps:

  1. On OpenSearch Dashboards, choose Create and choose Dashboard.

  1. Select Add an existing dashboard.
  2. Choose realestate-multilayer from the list.

You can see the new visualization on your dashboard.

  1. Choose Save and enter a title for the dashboard.

Conclusion

In this post, you effectively established multi-layer maps for data visualization, analyzed geographic data, observed various data at varying zoom levels, added tooltips for supplementary data visualization, and added multi-layer maps to dashboard panes within OpenSearch Dashboards to easily analyze your geospatial data. Refer to Using maps for an alternative use case and detailed information about map features.


About the authors

Aish Gunasekar is a Specialist Solutions Architect with a focus on Amazon OpenSearch Service. Her passion at AWS is to help customers design highly scalable architectures and help them in their cloud adoption journey. Outside of work, she enjoys hiking and baking.

Satish Nandi is a Senior Technical Product Manager for Amazon OpenSearch Service.

Jon Handler is a Senior Principal Solutions Architect at Amazon Web Services based in Palo Alto, CA. Jon works closely with OpenSearch and Amazon OpenSearch Service, providing help and guidance to a broad range of customers who have search and log analytics workloads that they want to move to the AWS Cloud. Prior to joining AWS, Jon’s career as a software developer included 4 years of coding a large-scale, ecommerce search engine. Jon holds a Bachelor of the Arts from the University of Pennsylvania, and a Master of Science and a PhD in Computer Science and Artificial Intelligence from Northwestern University.

Run Spark SQL on Amazon Athena Spark

Post Syndicated from Pathik Shah original https://aws.amazon.com/blogs/big-data/run-spark-sql-on-amazon-athena-spark/

At AWS re:Invent 2022, Amazon Athena launched support for Apache Spark. With this launch, Amazon Athena supports two open-source query engines: Apache Spark and Trino. Athena Spark allows you to build Apache Spark applications using a simplified notebook experience on the Athena console or through Athena APIs. Athena Spark notebooks support PySpark and notebook magics to allow you to work with Spark SQL. For interactive applications, Athena Spark allows you to spend less time waiting and be more productive, with application startup time in under a second. And because Athena is serverless and fully managed, you can run your workloads without worrying about the underlying infrastructure.

Modern applications store massive amounts of data on Amazon Simple Storage Service (Amazon S3) data lakes, providing cost-effective and highly durable storage, and allowing you to run analytics and machine learning (ML) from your data lake to generate insights on your data. Before you run these workloads, most customers run SQL queries to interactively extract, filter, join, and aggregate data into a shape that can be used for decision-making, model training, or inference. Running SQL on data lakes is fast, and Athena provides an optimized, Trino- and Presto-compatible API that includes a powerful optimizer. In addition, organizations across multiple industries such as financial services, healthcare, and retail are adopting Apache Spark, a popular open-source, distributed processing system that is optimized for fast analytics and advanced transformations against data of any size. With support in Athena for Apache Spark, you can use both Spark SQL and PySpark in a single notebook to generate application insights or build models. Start with Spark SQL to extract, filter, and project attributes that you want to work with. Then to perform more complex data analysis such as regression tests and time series forecasting, you can use Apache Spark with Python, which allows you to take advantage of a rich ecosystem of libraries, including data visualization in Matplot, Seaborn, and Plotly.

In this first post of a three-part series, we show you how to get started using Spark SQL in Athena notebooks. We demonstrate querying databases and tables in the Amazon S3 and the AWS Glue Data Catalog using Spark SQL in Athena. We cover some common and advanced SQL commands used in Spark SQL, and show you how to use Python to extend your functionality with user-defined functions (UDFs) as well as to visualize queried data. In the next post, we’ll show you how to use Athena Spark with open-source transactional table formats. In the third post, we’ll cover analyzing data sources other than Amazon S3 using Athena Spark.

Prerequisites

To get started, you will need the following:

Provide Athena Spark access to your data through an IAM role

As you proceed through this walkthrough, we create new databases and tables. By default, Athena Spark doesn’t have permission to do this. To provide this access, you can add the following inline policy to the AWS Identity and Access Management (IAM) role attached to the workgroup, providing the region and your account number. For more information, refer to the section To embed an inline policy for a user or role (console) in Adding IAM identity permissions (console).

{
  "Version": "2012-10-17",
  "Statement": [
      {
          "Sid": "ReadfromPublicS3",
          "Effect": "Allow",
          "Action": [
              "s3:GetObject",
              "s3:ListBucket"
          ],
          "Resource": [
              "arn:aws:s3:::athena-examples-us-east-1/*",
              "arn:aws:s3:::athena-examples-us-east-1"
          ]
      },
      {
            "Sid": "GlueReadDatabases",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabases"
            ],
            "Resource": "arn:aws:glue:<region>:<account-id>:*"
        },
        {
            "Sid": "GlueReadDatabase",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetTable",
                "glue:GetTables",
                "glue:GetPartition",
                "glue:GetPartitions"
            ],
            "Resource": [
                "arn:aws:glue:<region>:<account-id>:catalog",
                "arn:aws:glue:<region>:<account-id>:database/sparkblogdb",
                "arn:aws:glue:<region>:<account-id>:table/sparkblogdb/*",
                "arn:aws:glue:<region>:<account-id>:database/default"
            ]
        },
        {
            "Sid": "GlueCreateDatabase",
            "Effect": "Allow",
            "Action": [
                "glue:CreateDatabase"
            ],
            "Resource": [
                "arn:aws:glue:<region>:<account-id>:catalog",
                "arn:aws:glue:<region>:<account-id>:database/sparkblogdb"
            ]
        },
        {
            "Sid": "GlueDeleteDatabase",
            "Effect": "Allow",
            "Action": "glue:DeleteDatabase",
            "Resource": [
                "arn:aws:glue:<region>:<account-id>:catalog",
                "arn:aws:glue:<region>:<account-id>:database/sparkblogdb",
                "arn:aws:glue:<region>:<account-id>:table/sparkblogdb/*"            ]
        },
        {
            "Sid": "GlueCreateDeleteTablePartitions",
            "Effect": "Allow",
            "Action": [
                "glue:CreateTable",
                "glue:UpdateTable",
                "glue:DeleteTable",
                "glue:BatchCreatePartition",
                "glue:CreatePartition",
                "glue:DeletePartition",
                "glue:BatchDeletePartition",
                "glue:UpdatePartition",
                "glue:GetPartition",
                "glue:GetPartitions",
                "glue:BatchGetPartition"
            ],
            "Resource": [
                "arn:aws:glue:<region>:<account-id>:catalog",
                "arn:aws:glue:<region>:<account-id>:database/sparkblogdb",
                "arn:aws:glue:<region>:<account-id>:table/sparkblogdb/*"
            ]
        }
  ]
}

Run SQL queries directly in notebook without using Python

When using Athena Spark notebooks, we can run SQL queries directly without having to use PySpark. We do this by using cell magics, which are special headers in a notebook that change the cells’ behavior. For SQL, we can add the %%sql magic, which will interpret the entire cell contents as a SQL statement to be run on Athena Spark.

Now that we have our workgroup and notebook created, let’s start exploring the NOAA Global Surface Summary of Day dataset, which provides environmental measures from various locations all over the earth. The datasets used in this post are public datasets hosted in the following Amazon S3 locations:

  • Parquet data for year 2020s3://athena-examples-us-east-1/athenasparkblog/noaa-gsod-pds/parquet/2020/
  • Parquet data for year 2021 s3://athena-examples-us-east-1/athenasparksqlblog/noaa_pq/year=2021/
  • Parquet data from year 2022s3://athena-examples-us-east-1/athenasparksqlblog/noaa_pq/year=2022/

To use this data, we need an AWS Glue Data Catalog database that acts as the metastore for Athena, allowing us to create external tables that point to the location of datasets in Amazon S3. First, we create a database in the Data Catalog using Athena and Spark.

Create a database

Run following SQL in your notebook using %%sql magic:

%%sql 
CREATE DATABASE sparkblogdb

You get the following output:
Output of CREATE DATABASE SQL

Create a table

Now that we have created a database in the Data Catalog, we can create a partitioned table that points to our dataset stored in Amazon S3:

%%sql
CREATE EXTERNAL TABLE sparkblogdb.noaa_pq(
  station string, 
  date string, 
  latitude string, 
  longitude string, 
  elevation string, 
  name string, 
  temp string, 
  temp_attributes string, 
  dewp string, 
  dewp_attributes string, 
  slp string, 
  slp_attributes string, 
  stp string, 
  stp_attributes string, 
  visib string, 
  visib_attributes string, 
  wdsp string, 
  wdsp_attributes string, 
  mxspd string, 
  gust string, 
  max string, 
  max_attributes string, 
  min string, 
  min_attributes string, 
  prcp string, 
  prcp_attributes string, 
  sndp string, 
  frshtt string)
  PARTITIONED BY (year string)
STORED AS PARQUET
LOCATION 's3://athena-examples-us-east-1/athenasparksqlblog/noaa_pq/'

This dataset is partitioned by year, meaning that we store data files for each year separately, which simplifies management and improves performance because we can target the specific S3 locations in a query. The Data Catalog knows about the table, and now we’ll let it work out how many partitions we have automatically by using the MSCK utility:

%%sql
MSCK REPAIR TABLE sparkblogdb.noaa_pq

When the preceding statement is complete, you can run the following command to list the yearly partitions that were found in the table:

%%sql
SHOW PARTITIONS sparkblogdb.noaa_pq

Output of SHOW PARTITIONS SQL

Now that we have the table created and partitions added, let’s run a query to find the minimum recorded temperature for the 'SEATTLE TACOMA AIRPORT, WA US' location:

%%sql
select year, min(MIN) as minimum_temperature 
from sparkblogdb.noaa_pq 
where name = 'SEATTLE TACOMA AIRPORT, WA US' 
group by 1

You get the following output:

The image shows output of previous SQL statement.

Query a cross-account Data Catalog from Athena Spark

Athena supports accessing cross-account AWS Glue Data Catalogs, which enables you to use Spark SQL in Athena Spark to query a Data Catalog in an authorized AWS account.

The cross-account Data Catalog access pattern is often used in a data mesh architecture, when a data producer wants to share a catalog and data with consumer accounts. The consumer accounts can then perform data analysis and explorations on the shared data. This is a simplified model where we don’t need to use AWS Lake Formation data sharing. The following diagram gives an overview of how the setup works between one producer and one consumer account, which can be extended to multiple producer and consumer accounts.

The image gives an overview of how the setup works between one producer and one consumer account, which can be extended to multiple producer and consumer accounts.

You need to set up the right access policies on the Data Catalog of the producer account to enable cross-account access. Specifically, you must make sure the consumer account’s IAM role used to run Spark calculations on Athena has access to the cross-account Data Catalog and data in Amazon S3. For setup instructions, refer to Configuring cross-account AWS Glue access in Athena for Spark.

There are two ways the consumer account can access the cross-account Data Catalog from Athena Spark, depending on whether you are querying from one producer account or multiple.

Query a single producer table

If you are just querying data from a single producer’s AWS account, you can tell Athena Spark to only use that account’s catalog to resolve database objects. When using this option, you don’t have to modify the SQL because you’re configuring the AWS account ID at session level. To enable this method, edit the session and set the property "spark.hadoop.hive.metastore.glue.catalogid": "999999999999" using the following steps:

  1. In the notebook editor, on the Session menu, choose Edit session.
    Image shows wherre to click to edit session
  2. Choose Edit in JSON.
  3. Add the following property and choose Save:
    {"spark.hadoop.hive.metastore.glue.catalogid": "999999999999"}The image shows where to put JSON config property to query single producerThis will start a new session with the updated parameters.
  4. Run the following SQL statement in Spark to query tables from the producer account’s catalog:
    %%sql
    SELECT * 
    FROM <central-catalog-db>.<table> 
    LIMIT 10

Query multiple producer tables

Alternatively, you can add the producer AWS account ID in each database name, which is helpful if you’re going to query Data Catalogs from different owners. To enable this method, set the property {"spark.hadoop.aws.glue.catalog.separator": "/"} when invoking or editing the session (using the same steps as the previous section). Then, you add the AWS account ID for the source Data Catalog as part of the database name:

%%sql
SELECT * 
FROM `<producer-account1-id>/database1`.table1 t1 
join `<producer-account2-id>/database2`.table2 t2 
ON t1.id = t2.id
limit 10

If the S3 bucket belonging to the producer AWS account is configured with Requester Pays enabled, the consumer is charged instead of the bucket owner for requests and downloads. In this case, you can add the following property when invoking or editing an Athena Spark session to read data from these buckets:

{"spark.hadoop.fs.s3.useRequesterPaysHeader": "true"}

Infer the schema of your data in Amazon S3 and join with tables crawled in the Data Catalog

Rather than only being able to go through the Data Catalog to understand the table structure, Spark can infer schema and read data directly from storage. This feature allows data analysts and data scientists to perform a quick exploration of the data without needing to create a database or table, but which can also be used with other existing tables stored in the Data Catalog in the same or across different accounts. To do this, we use a Spark temp view, which is an in-memory data structure that stores the schema of data stored in a data frame.

Using the NOAA dataset partition for 2020, we create a temporary view by reading S3 data into a data frame:

year_20_pq = spark.read.parquet(f"s3://athena-examples-us-east-1/athenasparkblog/noaa-gsod-pds/parquet/2020/")
year_20_pq.createOrReplaceTempView("y20view")

Now you can query the y20view using Spark SQL as if it were a Data Catalog database:

%%sql
select count(*) 
from y20view

Output of previous SQL query showing count value

You can query data from both temporary views and Data Catalog tables in the same query in Spark. For example, now that we have a table containing data for years 2021 and 2022, and a temporary view with 2020’s data, we can find the dates in each year when the maximum temperature was recorded for 'SEATTLE TACOMA AIRPORT, WA US'.

To do this, we can use the window function and UNION:

%%sql
SELECT date,
       max as maximum_temperature
FROM (
        SELECT date,
            max,
            RANK() OVER (
                PARTITION BY year
                ORDER BY max DESC
            ) rnk
        FROM sparkblogdb.noaa_pq
        WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'
          AND year IN ('2021', '2022')
        UNION ALL
        SELECT date,
            max,
            RANK() OVER (
                ORDER BY max DESC
            ) rnk
        FROM y20view
        WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'
    ) t
WHERE rnk = 1
ORDER by 1

You get the following output:

Output of previous SQL

Extend your SQL with a UDF in Spark SQL

You can extend your SQL functionality by registering and using a custom user-defined function in Athena Spark. These UDFs are used in addition to the common predefined functions available in Spark SQL, and once created, can be reused many times within a given session.

In this section, we walk through a straightforward UDF that converts a numeric month value into the full month name. You have the option to write the UDF in either Java or Python.

Java-based UDF

The Java code for the UDF can be found in the GitHub repository. For this post, we have uploaded a prebuilt JAR of the UDF to s3://athena-examples-us-east-1/athenasparksqlblog/udf/month_number_to_name.jar.

To register the UDF, we use Spark SQL to create a temporary function:

%%sql
CREATE OR REPLACE TEMPORARY FUNCTION 
month_number_to_name as 'com.example.MonthNumbertoNameUDF'
using jar "s3a://athena-examples-us-east-1/athenasparksqlblog/udf/month_number_to_name.jar";

Now that the UDF is registered, we can call it in a query to find the minimum recorded temperature for each month of 2022:

%%sql
select month_number_to_name(month(to_date(date,'yyyy-MM-dd'))) as month_yr_21,
min(min) as min_temp
from sparkblogdb.noaa_pq 
where NAME == 'SEATTLE TACOMA AIRPORT, WA US' 
group by 1 
order by 2

You get the following output:

Output of SQL using UDF

Python-based UDF

Now let’s see how to add a Python UDF to the existing Spark session. The Python code for the UDF can be found in the GitHub repository. For this post, the code has been uploaded to s3://athena-examples-us-east-1/athenasparksqlblog/udf/month_number_to_name.py.

Python UDFs can’t be registered in Spark SQL, so instead we use a small bit of PySpark code to add the Python file, import the function, and then register it as a UDF:

sc.addPyFile('s3://athena-examples-us-east-1/athenasparksqlblog/udf/month_number_to_name.py')

from month_number_to_name import month_number_to_name
spark.udf.register("month_number_to_name_py",month_number_to_name)

Now that the Python-based UDF is registered, we can use the same query from earlier to find the minimum recorded temperature for each month of 2022. The fact that it’s Python rather than Java doesn’t matter now:

%%sql
select month_number_to_name_py(month(to_date(date,'yyyy-MM-dd'))) as month_yr_21,
min(min) as min_temp
from sparkblogdb.noaa_pq 
where NAME == 'SEATTLE TACOMA AIRPORT, WA US' 
group by 1 
order by 2

The output should be similar to that in the preceding section.

Plot visuals from the SQL queries

It’s straightforward to use Spark SQL, including across AWS accounts for data exploration, and not complicated to extend Athena Spark with UDFs. Now let’s see how we can go beyond SQL using Python to visualize data within the same Spark session to look for patterns in the data. We use the table and temporary views created previously to generate a pie chart that shows percentage of readings taken in each year for the station 'SEATTLE TACOMA AIRPORT, WA US'.

Let’s start by creating a Spark data frame from a SQL query and converting it to a pandas data frame:

#we will use spark.sql instead of %%sql magic to enclose the query string
#this will allow us to read the results of the query into a dataframe to use with our plot command
sqlDF = spark.sql("select year, count(*) as cnt from sparkblogdb.noaa_pq where name = 'SEATTLE TACOMA AIRPORT, WA US' group by 1 \
                  union all \
                  select 2020 as year, count(*) as cnt from y20view where name = 'SEATTLE TACOMA AIRPORT, WA US'")

#convert to pandas data frame
seatac_year_counts=sqlDF.toPandas()

Next, the following code uses the pandas data frame and Matplot library to plot a pie chart:

import matplotlib.pyplot as plt

# clear the state of the visualization figure
plt.clf()

# create a pie chart with values from the 'cnt' field, and yearly labels
plt.pie(seatac_year_counts.cnt, labels=seatac_year_counts.year, autopct='%1.1f%%')
%matplot plt

The following figure shows our output.

Output of code showing pie chart

Clean up

To clean up the resources created for this post, complete the following steps:

  1. Run the following SQL statements in the notebook’s cell to delete the database and tables from the Data Catalog:
    %%sql
    DROP TABLE sparkblogdb.noaa_pq
    
    %%sql
    DROP DATABASE sparkblogdb

  2. Delete the workgroup created for this post. This will also delete saved notebooks that are part of the workgroup.
  3. Delete the S3 bucket that you created as part of the workgroup.

Conclusion

Athena Spark makes it easier than ever to query databases and tables in the AWS Glue Data Catalog directly through Spark SQL in Athena, and to query data directly from Amazon S3 without needing a metastore for quick data exploration. It also makes it straightforward to use common and advanced SQL commands used in Spark SQL, including registering UDFs for custom functionality. Additionally, Athena Spark makes it effortless to use Python in a fast start notebook environment to visualize and analyze data queried via Spark SQL.

Overall, Spark SQL unlocks the ability to go beyond standard SQL in Athena, providing advanced users more flexibility and power through both SQL and Python in a single integrated notebook, and providing fast, complex analysis of data in Amazon S3 without infrastructure setup. To learn more about Athena Spark, refer to Amazon Athena for Apache Spark.


About the Authors

Pathik Shah is a Sr. Analytics Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.

Raj Devnath is a Product Manager at AWS on Amazon Athena. He is passionate about building products customers love and helping customers extract value from their data. His background is in delivering solutions for multiple end markets, such as finance, retail, smart buildings, home automation, and data communication systems.