Tag Archives: Analytics

Free, Privacy-First Analytics for a Better Web

Post Syndicated from Jon Levine original https://blog.cloudflare.com/free-privacy-first-analytics-for-a-better-web/

Free, Privacy-First Analytics for a Better Web

Everyone with a website needs to know some basic facts about their website: what pages are people visiting? Where in the world are they? What other sites sent traffic to my website?

There are “free” analytics tools out there, but they come at a cost: not money, but your users’ privacy. Today we’re announcing a brand new, privacy-first analytics service that’s open to everyone — even if they’re not already a Cloudflare customer. And if you’re a Cloudflare customer, we’ve enhanced our analytics to make them even more powerful than before.

The most important analytics feature: Privacy

The most popular analytics services available were built to help ad-supported sites sell more ads. But, a lot of websites don’t have ads. So if you use those services, you’re giving up the privacy of your users in order to understand how what you’ve put online is performing.

Cloudflare’s business has never been built around tracking users or selling advertising. We don’t want to know what you do on the Internet — it’s not our business. So we wanted to build an analytics service that gets back to what really matters for web creators, not necessarily marketers, and to give web creators the information they need in a simple, clean way that doesn’t sacrifice their visitors’ privacy. And giving web creators these analytics shouldn’t depend on their use of Cloudflare’s infrastructure for performance and security. (More on that in a bit.)

What does it mean for us to make our analytics “privacy-first”? Most importantly, it means we don’t need to track individual users over time for the purposes of serving analytics. We don’t use any client-side state, like cookies or localStorage, for the purposes of tracking users. And we don’t “fingerprint” individuals via their IP address, User Agent string, or any other data for the purpose of displaying analytics. (We consider fingerprinting even more intrusive than cookies, because users have no way to opt out.)

Counting visits without tracking users

One of the most essential stats about any website is: “how many people went there”? Analytics tools frequently show counts of “unique” visitors, which requires tracking individual users by a cookie or IP address.

We use the concept of a visit: a privacy-friendly measure of how people have interacted with your website. A visit is defined simply as a successful page view that has an HTTP referer that doesn’t match the hostname of the request. This tells you how many times people came to your website and clicked around before navigating away, but doesn’t require tracking individuals.

Free, Privacy-First Analytics for a Better Web

A visit has slightly different semantics from a “unique”, and you should expect this number to differ from other analytics tools.

All of the details, none of the bots

Our analytics deliver the most important metrics about your website, like page views and visits. But we know that an essential analytics feature is flexibility: the ability to add arbitrary filters, and slice-and-dice data as you see fit. Our analytics can show you the top hostnames, URLs, countries, and other critical metrics like status codes. You can filter on any of these metrics with a click and see the whole dashboard update.

I’m especially excited about two features in our time series charts: the ability to drag-to-zoom into a narrower time range, and the ability to “group by” different dimensions to see data in a different way. This is a super powerful way to drill into an anomaly in traffic and quickly see what’s going on. For example, you might notice a spike in traffic, zoom into that spike, and then try different groupings to see what contributed the extra clicks. A GIF is worth a thousand words:

And for customers of our Bot Management product, we’re working on the ability to detect (and remove) automated traffic. Coming very soon, you’ll be able to see which bots are reaching your website — with just a click, block them by using Firewall Rules.

This is all possible thanks to our ABR analytics technology, which enables us to serve analytics very quickly for websites large and small. Check out our blog post to learn more about how this works.

Edge or Browser analytics? Why not both?

There are two ways to collect web analytics data: at the edge (or on an origin server), or in the client using a JavaScript beacon.

Historically, Cloudflare has collected analytics data at our edge. This has some nice benefits over traditional, client-side analytics approaches:

  • It’s more accurate because you don’t miss users who block third-party scripts, or JavaScript altogether
  • You can see all of the traffic back to your origin server, even if an HTML page doesn’t load
  • We can detect (and block bots), apply Firewall rules, and generally scrub traffic of unwanted noise
  • You can measure the performance of your origin server

More commonly, most web analytics providers use client-side measurement. This has some benefits as well:

  • You can understand performance as your users see it — e.g. how long did the page actually take to render
  • You can detect errors in client-side JavaScript execution
  • You can define custom event types emitted by JavaScript frameworks

Ultimately, we want our customers to have the best of both worlds. We think it’s really powerful to get web traffic numbers directly from the edge. We also launched Browser Insights a year ago to augment our existing edge analytics with more performance information, and today Browser Insights are taking a big step forward by incorporating Web Vitals metrics.

But, we know not everyone can modify their DNS to take advantage of Cloudflare’s edge services. That’s why today we’re announcing a free, standalone analytics product for everyone.

How do I get it?

For existing Cloudflare customers on our Pro, Biz, and Enterprise plans, just go to your Analytics tab! Starting today, you’ll see a banner to opt-in to the new analytics experience. (We plan to make this the default in a few weeks.)

But when building privacy-first analytics, we realized it’s important to make this accessible even to folks who don’t use Cloudflare today. You’ll be able to use Cloudflare’s web analytics even if you can’t change your DNS servers — just add our JavaScript, and you’re good to go.

We’re still putting on the finishing touches on our JavaScript-based analytics, but you can sign up here and we’ll let you know when it’s ready.

The evolution of analytics at Cloudflare

Just over a year ago, Cloudflare’s analytics consisted of a simple set of metrics: cached vs uncached data transfer, or how many requests were blocked by the Firewall. Today we provide flexible, powerful analytics across all our products, including Firewall, Cache, Load Balancing and Network traffic.

While we’ve been focused on building analytics about our products, we realized that our analytics are also powerful as a standalone product. Today is just the first step on that journey. We have so much more planned: from real-time analytics, to ever-more performance analysis, and even allowing customers to add custom events.

We want to hear what you want most out of analytics — drop a note in the comments to let us know what you want to see next.

Explaining Cloudflare’s ABR Analytics

Post Syndicated from Jamie Herre original https://blog.cloudflare.com/explaining-cloudflares-abr-analytics/

Explaining Cloudflare's ABR Analytics

Cloudflare’s analytics products help customers answer questions about their traffic by analyzing the mind-boggling, ever-increasing number of events (HTTP requests, Workers requests, Spectrum events) logged by Cloudflare products every day.  The answers to these questions depend on the point of view of the question being asked, and we’ve come up with a way to exploit this fact to improve the quality and responsiveness of our analytics.

Useful Accuracy

Consider the following questions and answers:

What is the length of the coastline of Great Britain? 12.4K km
What is the total world population? 7.8B
How many stars are in the Milky Way? 250B
What is the total volume of the Antarctic ice shelf? 25.4M km3
What is the worldwide production of lentils? 6.3M tonnes
How many HTTP requests hit my site in the last week? 22.6M

Useful answers do not benefit from being overly exact.  For large quantities, knowing the correct order of magnitude and a few significant digits gives the most useful answer.  At Cloudflare, the difference in traffic between different sites or when a single site is under attack can cross nine orders of magnitude and, in general, all our traffic follows a Pareto distribution, meaning that what’s appropriate for one site or one moment in time might not work for another.

Explaining Cloudflare's ABR Analytics

Because of this distribution, a query that scans a few hundred records for one customer will need to scan billions for another.  A report that needs to load a handful of rows under normal operation might need to load millions when a site is under attack.

To get a sense of the relative difference of each of these numbers, remember “Powers of Ten”, an amazing visualization that Ray and Charles Eames produced in 1977.  Notice that the scale of an image determines what resolution is practical for recording and displaying it.

Explaining Cloudflare's ABR Analytics

Using ABR to determine resolution

This basic fact informed our design and implementation of ABR for Cloudflare analytics.  ABR stands for “Adaptive Bit Rate”.  It’s essentially an eponym for the term as used in video streaming such as Cloudflare’s own Stream Delivery.  In those cases, the server will select the best resolution for a video stream to match your client and network connection.

In our case, every analytics query that supports ABR will be calculated at a resolution matching the query.  For example, if you’re interested to know from which country the most firewall events were generated in the past week, the system might opt to use a lower resolution version of the firewall data than if you had opted to look at the last hour. The lower resolution version will provide the same answer but take less time and fewer resources.  By using multiple, different resolutions of the same data, our analytics can provide consistent response times and a better user experience.

You might be aware that we use a columnar store called ClickHouse to store and process our analytics data.  When using ABR with ClickHouse, we write the same data at multiple resolutions into separate tables.  Usually, we cover seven orders of magnitude – from 100% to 0.0001% of the original events.  We wind up using an additional 12% of disk storage but enable very fast ad hoc queries on the reduced resolution tables.

Explaining Cloudflare's ABR Analytics

Aggregations and Rollups

The ABR technique facilitates aggregations by making compact estimates of every dimension.  Another way to achieve the same ends is with a system that computes “rollups”.  Rollups save space by computing either complete or partial aggregations of the data as it arrives.  

For example, suppose we wanted to count a total number of lentils. (Lentils are legumes and among the oldest and most widely cultivated crops.  They are a staple food in many parts of the world.)  We could just count each lentil as it passed through the processing system. Of course because there a lot of lentils, that system is distributed – meaning that there are hundreds of separate machines.  Therefore we’ll actually have hundreds of separate counters.

Also, we’ll want to include more information than just the count, so we’ll also include the weight of each lentil and maybe 10 or 20 other attributes. And of course, we don’t want just a total for each attribute, but we’ll want to be able to break it down by color, origin, distributor and many other things, and also we’ll want to break these down by slices of time.

In the end, we’ll have tens of thousands or possibly millions of aggregations to be tabulated and saved every minute.  These aggregations are expensive to compute, especially when using aggregations more complicated than simple counters and sums.  They also destroy some information.  For example, once we’ve processed all the lentils through the rollups, we can’t say for sure that we’ve counted them all, and most importantly, whichever attributes we neglected to aggregate are unavailable.

The number we’re counting, 6.3M tonnes, only includes two significant digits which can easily be achieved by counting a sample.  Most of the rollup computations used on each lentil (on the order 1013 to account for 6.3M tonnes) are wasted.

Other forms of aggregations

So far, we’ve discussed ABR and its application to aggregations, but we’ve only given examples involving “counts” and “sums”.  There are other, more complex forms of aggregations we use quite heavily.  Two examples are “topK” and “count-distinct”.

A “topK” aggregation attempts to show the K most frequent items in a set.  For example, the most frequent IP address, or country.  To compute topK, just count the frequency of each item in the set and return the K items with the highest frequencies. Under ABR, we compute topK based on the set found in the matching resolution sample. Using a sample makes this computation a lot faster and less complex, but there are problems.

The estimate of topK derived from a sample is biased and dependent on the distribution of the underlying data. This can result in overestimating the significance of elements in the set as compared to their frequency in the full set. In practice this effect can only be noticed when the cardinality of the set is very high and you’re not going to notice this effect on a Cloudflare dashboard.  If your site has a lot of traffic and you’re looking at the Top K URLs or browser types, there will be no difference visible at different resolutions.  Also keep in mind that as long as we’re estimating the “proportion” of the element in the set and the set is large, the results will be quite accurate.

The other fascinating aggregation we support is known as “count-distinct”, or number of uniques.  In this case we want to know the number of unique values in a set.  For example, how many unique cache keys have been used.  We can safely say that a uniform random sample of the set cannot be used to estimate this number.  However, we do have a solution.

We can generate another, alternate sample based on the value in question.  For example, instead of taking a random sample of all requests, we take a random sample of IP addresses.  This is sometimes called distinct reservoir sampling, and it allows us to estimate the true number of distinct IPs based on the cardinality of the sampled set. Again, there are techniques available to improve these estimates, and we’ll be implementing some of those.

ABR improves resilience and scalability

Using ABR saves us resources.  Even better, it allows us to query all the attributes in the original data, not just those included in rollups.  And even better, it allows us to check our assumptions against different sample intervals in separate tables as a check that the system is working correctly, because the original events are preserved.

However, the greatest benefits of employing ABR are the ones that aren’t directly visible. Even under ideal conditions, a large distributed system such as Cloudflare’s data pipeline is subject to high tail latency.  This occurs when any single part of the system takes longer than usual for any number of a long list of reasons.  In these cases, the ABR system will adapt to provide the best results available at that moment in time.

For example, compare this chart showing Cache Performance for a site under attack with the same chart generated a moment later while we simulate a failure of some of the servers in our cluster.  In the days before ABR, your Cloudflare dashboard would fail to load in this scenario.  Now, with ABR analytics, you won’t see significant degradation.

Explaining Cloudflare's ABR Analytics
Explaining Cloudflare's ABR Analytics

Stretching the analogy to ABR in video streaming, we want you to be able to enjoy your analytics dashboard without being bothered by issues related to faulty servers, or network latency, or long running queries.  With ABR you can get appropriate answers to your questions reliably and within a predictable amount of time.

In the coming months, we’re going to be releasing a variety of new dashboards and analytics products based on this simple but profound technology.  Watch your Cloudflare dashboard for increasingly useful and interactive analytics.

Start measuring Web Vitals with Browser Insights

Post Syndicated from Jon Levine original https://blog.cloudflare.com/start-measuring-web-vitals-with-browser-insights/

Start measuring Web Vitals with Browser Insights

Many of us at Cloudflare obsess about how to make websites faster. But to improve performance, you have to measure it first. Last year we launched Browser Insights to help our customers measure web performance from the perspective of end users.

Today, we’re partnering with the Google Chrome team to bring Web Vitals measurements into Browser Insights. Web Vitals are a new set of metrics to help web developers and website owners measure and understand load time, responsiveness, and visual stability. And with Cloudflare’s Browser Insights, they’re easier to measure than ever – and it’s free for anyone to collect data from the whole web.

Start measuring Web Vitals with Browser Insights

Why do we need Web Vitals?

When trying to understand performance, it’s tempting to focus on the metrics that are easy to measure — like Time To First Byte (TTFB). While TTFB and similar metrics are important to understand, we’ve learned that they don’t always tell the whole story.

Our partners on the Google Chrome team have tackled this problem by breaking down user experience into three components:

  • Loading: How long did it take for content to become available?
  • Interactivity: How responsive is the website when you interact with it?
  • Visual stability: How much does the page move around while loading? (I think of this as the inverse of “jankiness”)
Start measuring Web Vitals with Browser Insights
This image is reproduced from work created and shared by Google and used according to terms described in the Creative Commons 4.0 Attribution License.

It’s challenging to create a single metric that captures these high-level components. Thankfully, the folks at Google Chrome team have thought about this, and earlier this year introduced three “Core” Web Vitals metrics:  Largest Contentful Paint,  First Input Delay, and Cumulative Layout Shift.

How do Web Vitals help make your website faster?

Measuring the Core Web Vitals isn’t the end of the story. Rather, they’re a jumping off point to understand what factors impact a website’s performance. Web Vitals tells you what is happening at a high level, and other more detailed metrics help you understand why user experience could be slow.

Take loading time, for example. If you notice that your Largest Contentful Paint score is “needs improvement”, you want to dig into what is taking so long to load! Browser Insights still measures navigation timing metrics like DNS lookup time and TTFB. By analyzing these metrics in turn, you might want to dig further into optimizing cache hit rates, tuning the performance of your origin server, or tweaking order in which resources like JavaScript and CSS load.

Start measuring Web Vitals with Browser Insights

For more information about improving web performance, check out Google’s guides to improving LCP, FID, and CLS.

Why measure Web Vitals with Cloudflare?

First, we think that RUM (Real User Measurement) is a critical companion to synthetic measurement. While you can always try a few page loads on your own laptop and see the results, gathering data from real users is the only way to take into account real-life device performance and network conditions.

There are other great RUM tools out there. Google’s Chrome User Experience Report (CrUX) collects data about the entire web and makes it available through tools like Page Speed Insights (PSI), which combines synthetic and RUM results into useful diagnostic information.

One major benefit of Cloudflare’s Browser Insights is that it updates constantly; new data points are available shortly after seeing a request from an end-user. The data in the Chrome UX Report is a 28-day rolling average of aggregated metrics, so you need to wait until you can see changes reflected in the data.

Another benefit of Browser Insights is that we can measure any browser — not just Chrome. As of this writing, the APIs necessary to report Web Vitals are only supported in Chromium browsers, but we’ll support Safari and Firefox when they implement those APIs.

Finally, Brower Insights is free to use! We’ve worked really hard to make our analytics blazing fast for websites with any amount of traffic. We’re excited to support slicing and grouping by URL, Browser, OS, and Country, and plan to support several more dimensions soon.

Push a button to start measuring

To start using Browser Insights, just head over to the Speed tab in the dashboard. Starting today, Web Vitals metrics are now available for everyone!

Behind the scenes, Browser Insights works by inserting a JavaScript “beacon” into HTML pages. You can control where the beacon loads if you only want to measure specific pages or hostnames. If you’re using CSP version 3, we’ll even automatically detect the nonce (if present) and add it to the script.

Where we’ve been, and where we’re going

We’ve been really proud of the success of Browser Insights. We’ve been hard at work over the last year making lots of improvements — for example, we’ve made the dashboard fast and responsive (and still free!) even for the largest websites.

Coming soon, we’re excited to make this available for all our Web Analytics customers — even those who don’t use Cloudflare today. We’re also hard at work adding much-requested features like client-side error reporting, and diagnostics tools to make it easier to understand where to improve.

Enabling Amazon QuickSight federation with Azure AD

Post Syndicated from Adnan Hasan original https://aws.amazon.com/blogs/big-data/enabling-amazon-quicksight-federation-with-azure-ad/

Customers today want to establish a single identity and access strategy across all of their own apps, such as on-premises apps, third-party cloud apps (SaaS), or apps in AWS. If your organization use Azure Active Directory (Azure AD) for cloud applications, you can enable single sign-on (SSO) for applications like Amazon QuickSight without needing to create another user account or remember passwords. You can also enable role-based access control to make sure users get appropriate role permissions in QuickSight based on their entitlement stored in Active Directory attributes or granted through Active Directory group membership. The setup also allows administrators to focus on managing a single source of truth for user identities in Azure AD while having the convenience of configuring access to other AWS accounts and apps centrally.

In this post, we walk through the steps required to configure federated SSO between QuickSight and Azure AD. We also demonstrate ways to assign a QuickSight role based on Azure AD group membership. Administrators can publish the QuickSight app in the Azure App portal to enable users to SSO to QuickSight using their Azure AD or Active Directory credentials.

The solution in this post uses an identity provider (IdP)-initiated SSO, which means your end-users must log in to Azure AD and choose the published QuickSight app in the Azure App Portal portal to sign in to QuickSight.

Registering a QuickSight application in Azure AD

Your first step is to create a QuickSight application in Azure AD.

  1. Log in to your Azure portal using the administrator account in the Azure AD tenant where you want to register the QuickSight application.
  2. Under Azure Services, open Azure Active Directory and under Manage, choose Enterprise Application.
  3. Choose New Application.
  4. Select Non-gallery application.
  5. For Name, enter Amazon QuickSight.
  6. Choose Add to register the application.

Creating users and groups in Azure AD

You can now create new users and groups or choose existing users and groups that can access QuickSight.

  1. Under Manage, choose All applications and open Amazon QuickSight
  2. Under Getting Started, choose Assign users and groups.
  3. For this post, you create three groups, one for each QuickSight role:
    1. QuickSight-Admin
    2. QuickSight-Author
    3. QuickSight-Reader

For instructions on creating groups in Azure AD, see Create a basic group and add members using Azure Active Directory.

Configuring SSO in Azure AD

You can now start configuring the SSO settings for the app.

  1. Under Manage, choose Single sign-on.
  2. For Select a single sign-on method, choose SAML.
  3. To configure the sections, choose Edit.
  4. In the Basic SAML Configuration section, for Identifier (Entity ID), enter URN:AMAZON:WEBSERVICES.

This is the entity ID passed during the SAML exchange. Azure requires that this value be unique for each application. For additional AWS applications, you can append a number to the string; for example, URN:AMAZON:WEBSERVICES2.

  1. For Reply URL, enter https://signin.aws.amazon.com/saml.
  2. Leave Sign on URL blank.
  3. For Relay State, enter https://quicksight.aws.amazon.com.
  4. Leave Logout Url blank.

  5. Under SAML Signing Certificate, choose Download next to Federation Metadata XML.

You use this XML document later when setting up the SAML provider in AWS Identity and Access Management (IAM).

  1. Leave this tab open in your browser while moving on to the next steps.

Creating Azure AD as your SAML IdP in AWS

You now configure Azure AD as your SAML IdP.

  1. Open a new tab in your browser.
  2. Log in to the IAM console in your AWS account with admin permissions.
  3. On the IAM console, choose Identity providers.
  4. Choose Create provider.
  5. For Provider name, enter AzureActiveDirectory.
  6. Choose Choose File to upload the metadata document you downloaded earlier.
  7. Choose Next Step.
  8. Verify the provider information and choose Create.
  9. On the summary page, record the value for the provider ARN (arn:aws:iam::<AccountID>:saml-provider/AzureActiveDirectory).

You need this ARN to configure claims rules later in this post.

You can also complete this configuration using the AWS Command Line Interface (AWS CLI).

Configuring IAM policies

In this step, you create three IAM policies for different role permissions in QuickSight:

  • QuickSight-Federated-Admin
  • QuickSight-Federated-Author
  • QuickSight-Federated-Reader

Use the following steps to set up QuickSight-Federated-Admin policy. This policy grants admin privileges in QuickSight to the federated user:

  1. On the IAM console, choose Policies.
  2. Choose Create Policy.
  3. Choose JSON and replace the existing text with the following code:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": "quicksight:CreateAdmin",
                "Resource": "*"
            }
        ]
    }

  4. Choose Review policy
  5. For Name enter QuickSight-Federated-Admin.
  6. Choose Create policy.

Now repeat the steps to create QuickSight-Federated-Author and QuickSight-Federated-Reader policy using the following JSON codes for each policy:

QuickSight-Federated-Author

The following policy grants author privileges in QuickSight to the federated user:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "quicksight:CreateUser",
            "Resource": "*"
        }
    ]
}

QuickSight-Federated-Reader

The following policy grants reader privileges in QuickSight to the federated user:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "quicksight:CreateReader",
            "Resource": "*"
        }
    ]
}

Configuring IAM roles

Next, create the roles that your Azure AD users assume when federating into QuickSight. The following steps set up the admin role:

  1. On the IAM console, choose Roles.
  2. Choose Create role.
  3. For Select type of trusted entity, choose SAML 2.0 federation.
  4. For SAML provider, choose the provider you created earlier (AzureActiveDirectory).
  5. Select Allow programmatic and AWS Management Console access.
  6. For Attribute, make sure SAML:aud is selected.
  7. Value should show https://signin.aws.amazon.com/saml.
  8. Choose Next: Permissions.
  9. Choose the QuickSight-Federated-Admin IAM policy you created earlier.
  10. Choose Next: Tags.
  11. Choose Next: Review
  12. For Role name, enter QuickSight-Admin-Role.
  13. For Role description, enter a description.
  14. Choose Create role.
  15. On the IAM console, in the navigation pane, choose Roles.
  16. Choose the QuickSight-Admin-Role role you created to open the role’s properties.
  17. Record the role ARN to use later.
  18. On the Trust Relationships tab, choose Edit Trust Relationship.
  19. Under Trusted Entities, verify that the IdP you created is listed.
  20. Under Conditions, verify that SAML:aud with a value of https://signin.aws.amazon.com/saml is present.
  21. Repeat these steps to create your author and reader roles and attach the appropriate policies:
    1. For QuickSight-Author-Role, use the policy QuickSight-Federated-Author.
    2. For QuickSight-Reader-Role, use the policy QuickSight-Federated-Reader.

Configuring user attributes and claims in Azure AD

In this step, you return to the application in Azure portal and configure the user claims that Azure AD sends to AWS.

By default, several SAML attributes are populated for the new application, but you don’t need these attributes for federation into QuickSight. Under Additional Claims, select the unnecessary claims and choose Delete.

For this post, you create three claims:

  • Role
  • RoleSessionName
  • SAML_SUBJECT

Creating the Role claim

To create the Role claim, complete the following steps:

  1. Under Manage, choose Single sign-on.
  2. Choose Edit on User Attributes & Claims section
  3. Choose Add new claim.
  4. For Name, enter Role.
  5. For Namespace, enter https://aws.amazon.com/SAML/Attributes.
  6. Under Claim conditions, add a condition for the admin, author, and reader roles. Use the parameters in the following table:
User Type Scoped Group Source Value
Any QuickSight-Admin Attribute arn:aws:iam::253914981264:role/Quicksight-Admin-Role,arn:aws:iam::253914981264:saml-provider/AzureActiveDirectory
Any QuickSight-Author Attribute arn:aws:iam::253914981264:role/Quicksight-Author-Role,arn:aws:iam::253914981264:saml-provider/AzureActiveDirectory
Any QuickSight-Reader Attribute arn:aws:iam::253914981264:role/Quicksight-Reader-Role,arn:aws:iam::253914981264:saml-provider/AzureActiveDirectory

Creating the RoleSessionName claim

To create your RoleSessionName claim, complete the following steps:

  1. Choose Add new claim.
  2. For Name, enter RoleSessionName.
  3. For Namespace, enter https://aws.amazon.com/SAML/Attributes.
  4. For Source, choose Transformation.
  5. For Transformation, enter ExtractMailPrefix().
  6. For Parameter 1, enter user.userprincipalname.

We use the ExtractMailPrefix() function to extract the name from the userprincipalname attribute. For example, the function extracts the name joe from the user principal name value of [email protected]. IAM uses RoleSessionName to build the role session ID for the user signing into QuickSight. The role session ID is made up of the Role name and RoleSessionName, in Role/RoleSessionName format. Users are registered in QuickSight with the role session ID as the username.

Creating the SAML_SUBJECT claim

To create your final claim, SAML_SUBJECT, complete the following steps:

  1. Choose Add new claim.
  2. For Name, enter SAML_SUBJECT.
  3. For Namespace, enter https://aws.amazon.com/SAML/Attributes.
  4. For Source, choose Attribute.
  5. For Source attribute, enter ““Azure AD - QuickSight SSO””.

Testing the application

You’re now ready to test the application.

  1. In the Azure portal, on the Azure Active Directory page, choose All groups.
  2. Update the group membership of the QuickSight-Admin group by adding the current user to it.
  3. Under Enterprise Applications, choose Amazon QuickSight.
  4. Under Manage, choose Single sign-on.
  5. Choose Test this application to test the authentication flow.
  6. Log in to QuickSight as an admin.

The following screenshot shows you the QuickSight dashboard for the admin user.

  1. Remove the current user from QuickSight-Admin Azure AD group and add it to QuickSight-Author group.

When you test the application flow, you log in to QuickSight as an author.

  1. Remove the current user from QuickSight-Author group and add it to QuickSight-Reader group.

When you test the application flow again, you log in as a reader.

By removing the user from the Azure AD group will not automatically remove the registered user in QuickSight. You have to remove the user manually in the QuickSight admin console. The user management inside QuickSight is documented in this article.

Deep-linking QuickSight dashboards

You can share QuickSight dashboards using the sign-on URL for the QuickSight application published in the Azure Apps portal. This allows users to federate directly into the QuickSight dashboard without having to land first on the QuickSight homepage.

To deep-link to a specific QuickSight dashboard with SSO, complete the following steps:

  1. Under Enterprise Applications, choose Amazon QuickSight
  2. Under Manage, choose Properties.
  3. Locate the User access URL.
  4. Append ?RelayState to the end of the URL containing the URL of your dashboard. For example, https://myapps.microsoft.com/signin/Amazon%20QuickSight/a06d28e5-4aa4-4888-bb99-91d6c2c4eae8?RelayState=https://us-east-1.quicksight.aws.amazon.com/sn/dashboards/224103be-0470-4de4-829f-390e55b3ef96.

You can test it by creating a custom sign-in URL using the RelayState parameter pointing to an existing dashboard. Make sure the user signing in to the dashboard has been granted proper access.

Summary

This post provided step-by-step instructions to configure a federated SSO with Azure AD as the IdP. We also discussed how to map users and groups in Azure AD to IAM roles for secure access into QuickSight.

If you have any questions or feedback, please leave a comment.


About the Author

Adnan Hasan is a Global GTM Analytics Specialist at Amazon Web Services, helping customers transform their business using data, machine learning and advanced analytics. 

 

How Cookpad scaled its Amazon Redshift cluster while controlling costs with usage limits

Post Syndicated from Shimpei Kodama original https://aws.amazon.com/blogs/big-data/how-cookpad-scaled-its-amazon-redshift-cluster-while-controlling-costs-with-usage-limits/

This is a guest post by Shimpei Kodama, data engineer at Cookpad Inc.

Cookpad is a tech company that builds a community platform where people share recipe ideas and cooking tips. The company’s mission is to “make everyday cooking fun.” It’s one of the largest recipe-sharing platforms in Japan with over 50 million users per month, and is growing fast in other countries as well.

Cookpad has been using Amazon Redshift as its data analytics platform since 2015 to make all data accessible for employees to analyze and gain insights to delight end-users and grow its business. As of this writing, Cookpad’s Amazon Redshift cluster processes thousands of daily queries submitted by its globally distributed teams, along with over 500 batch jobs that run on the same cluster every day.

This post shares how Cookpad shortened load intervals for log data from several hours to minutes to make full logs queryable with Amazon Redshift Spectrum. We also discuss how concurrency scaling has reduced the query queue wait time by 15%. Finally, we present how we easily control costs using the Amazon Redshift pay-as-you-go pricing model.

Analyzing growing log data with Amazon Redshift Spectrum

As our service quickly grew, one of the challenges we had in late 2018 was ever-growing log data. At that time, we were loading data into about 250 log tables. The total volume of log data per month was about 3 TB after compression, and the percentage of disk usage was greater than 80%.

Because the number of log tables and amount of data volume kept growing, we finally reached a point where we were running out of disk space and couldn’t load into Amazon Redshift at specified intervals or keep the full logs queryable through Amazon Redshift without adding nodes. If the RA3 instances had released, we could solve the problem of disk capacity by using it, But we didn’t have that option in 2018.

Also, regarding the load intervals for log tables, we were loading most of the log tables at 6-hour intervals. Although some log tables required shorter intervals so we could check logs immediately after releasing a feature, we couldn’t allow that because it would affect query performance.

With about 3 TB of new logs being added every month (after compression)—causing 80% disk usage—we needed to add more nodes, or offload unused and older data from the cluster. However, neither option was acceptable to us. Although keeping infrequently accessed data like old logs on Amazon Redshift storage might look unreasonable, we wanted to keep old logs and query them for long-term analysis.

Our solution

To tackle those challenges, we decided to move our log data to Amazon Simple Storage Service (Amazon S3) and query it with Amazon Redshift Spectrum.

We built a new data pipeline called Prism, which puts log data (in the Parquet format) into an S3 bucket instead of using Amazon Kinesis Data Firehose. One reason for this was we wanted to have log files in Amazon S3 partitioned by log generation time, rather than by log receiving time, so that delayed logs are put into the right partitions (this custom prefix was released in February 2019). Another reason was we wanted to merge many small files in Amazon S3 into fewer reasonably sized files to improve user query performance. The following diagram shows an overview of our data pipeline.

The pipeline includes the following steps:

  1. Fluentd puts the log files into an S3 bucket.
  2. Amazon S3 event notifications are sent to an Amazon Simple Notification Service (Amazon SNS) topic and an Amazon Simple Queue Service (Amazon SQS) queue, which subscribes to the SNS topic that receives the messages.
  3. The preprocessor receives the messages from the SQS queue. It processes the log files to cleanse and transform them, and saves the processed files in an S3 bucket.
  4. Amazon S3 event notifications are sent to another SQS queue.
  5. The Prism Stream receives the messages from the SQS queue. It converts the files from JSON to Parquet and saves the converted files in another S3 bucket. The log files in the S3 bucket are partitioned by log generation date.
  6. The Parquet log files in the S3 bucket are now available for queries through Amazon Redshift Spectrum.
  7. The Prism Merge periodically merges many small log files into reasonably-sized files.

Results

The intervals for loading logs were dramatically shortened from several hours to about 10 minutes. And full logs are now available for queries through Amazon Redshift Spectrum without adding nodes.

As a bonus, the Amazon Redshift Spectrum schema on read characteristic eliminates the need to reload the data to fix character length overflow for the varchar column, which was occasionally required for Amazon Redshift local tables.

The performance has met our SLA and enabled us to analyze data right away in Amazon S3 without having to load it first. Based on the experiments we did, the performance of Amazon Redshift Spectrum was reasonable. To be more precise, our tests showed that Amazon Redshift Spectrum took 20% longer than Amazon Redshift local tables.

Improving query response time with concurrency scaling

Another challenge we experienced was increasing query queue time because of the nature of more jobs initiated from a globally distributed team. This resulted in some overloaded periods in terms of query concurrency. For example, batch jobs for the UK team start at 03:00 UTC and finish at 08:00 UTC, which translates to 12:00–17:00 in JST. As a result, the Japanese team would often experience degraded performance during that period.

To resolve this issue, we decided to enable Amazon Redshift Concurrency Scaling, which supports processing thousands of concurrent queries by automatically adding more clusters in the background whenever we need it.

We enabled it soon after AWS added the Usage Limits feature to Amazon Redshift, which allowed us to use concurrency scaling while controlling its costs. We’ve set the usage limits (Disable feature) to 1 hour per day.

After we enabled concurrency scaling, we started running over 100 queries on the concurrency scaling cluster every day. And the daily average queue wait time on the main cluster went down by 15%.

Optimizing costs for Amazon Redshift Spectrum and concurrency scaling

We purchased a Reserved Instance for our Amazon Redshift cluster, which provided a significant pricing discount. But the pricing model for both Amazon Redshift Spectrum and concurrency scaling is pay-as-you-go, so we decided to use workload management (WLM) and usage limits to control and monitor the cost to meet our budget.

For Amazon Redshift Spectrum, we configured WLM and usage limits (Alert). We configured the WLM to stop queries when scanning more than 1 TB, to prevent large scans by mistake. Additionally, we configured a weekly usage limit to send an alert to our communication platform when we exceeded the weekly budget for Amazon Redshift Spectrum.

The usage limits and concurrency scaling features monitor and control your usage and associated cost for using both functionalities. You can create daily, weekly, and monthly usage limits, and define actions to take if those limits are reached to maintain predictable spending. Actions include logging usage stats as an event to a system table, generating Amazon SNS alerts, and disabling Amazon Redshift Spectrum or concurrency scaling based on your defined thresholds. This allows you to continue reaping the benefits provided by both Amazon Redshift Spectrum and concurrency scaling with the peace of mind that you can stay within budget simply by configuring the appropriate thresholds.

For more information, see Managing usage limits in Amazon Redshift and watch the following video.

For concurrency scaling, we set the usage limits to 1 hour per day, considering the tradeoff between cost and queue wait time. So the concurrency scaling cluster is up and running for 1 hour (and a little longer, to be exact) every day in our environment. Thankfully, for every 24 hours that our main cluster is in use, we accrue a 1-hour credit for concurrency scaling. So we are using concurrency scaling with just a small additional cost.

Conclusion

Amazon Redshift is vital for Cookpad to enable its employees to conduct self-service analytics. As I’ve described in this post, we’ve successfully expanded our cluster’s capabilities by using new features provided by AWS, without adding nodes.


About the Authors

Shimpei Kodama is a data engineer at Cookpad Inc. Shimpei is in charge of the data infrastructure for analysis in Cookpad. He delivers data and the ability to process it to his colleagues to help them improve the value of their work.

 

 

 

Junpei Ozono is a Senior solutions architect at AWS in Japan. Junpei supports customers’ journeys on the AWS Cloud and guides them to design and develop lake house architectures powered by Amazon Redshift, Amazon S3, and other AWS services.

 

 

 

 

Making ETL easier with AWS Glue Studio

Post Syndicated from Leonardo Gomez original https://aws.amazon.com/blogs/big-data/making-etl-easier-with-aws-glue-studio/

AWS Glue Studio is an easy-to-use graphical interface that speeds up the process of authoring, running, and monitoring extract, transform, and load (ETL) jobs in AWS Glue. The visual interface allows those who don’t know Apache Spark to design jobs without coding experience and accelerates the process for those who do.

AWS Glue Studio was designed to help you create ETL jobs easily. After you design a job in the graphical interface, it generates Apache Spark code for you, abstracting users from the challenges of coding. When the job is ready, you can run it and monitor the job status using the integrated UI.

AWS Glue Studio supports different types of data sources, both structured and semi-structured, and offers data processing in real time and batch. You can extract data from sources like Amazon Simple Storage Service (Amazon S3), Amazon Relational Database Service (Amazon RDS), Amazon Kinesis, and Apache Kafka. It also offers Amazon S3 and tables defined in the AWS Glue Data Catalog as destinations.

This post shows you how to create an ETL job to extract, filter, join, and aggregate data easily using AWS Glue Studio.

About this blog post
Time to read 15 minutes
Time to complete 45 minutes
Cost to complete (estimated) Amazon S3: $0.023
AWS Glue: 0.036
AWS Identity & Access Management: $0
Total Cost: $0.059
Learning level Intermediate (200)
Services used AWS Glue, Amazon S3, AWS Identity and Access Management

Overview of solution

To demonstrate how to create an ETL job using AWS Glue Studio, we use the Toronto parking tickets dataset, specifically the data about parking tickets issued in the city of Toronto in 2018, and the trials dataset, which contains all the information about the trials related to those parking tickets. The goal is to filter, join, and aggregate the two datasets to get the number of parking tickets handled per court in the city of Toronto during that year.

Prerequisites

For this walkthrough, you should have an AWS account. For this post, you launch the required AWS resources using AWS CloudFormation in the us-east-1 Region. If you haven’t signed up for AWS, complete the following tasks:

  1. Create an account.
  2. Create an AWS Identity and Access Management (IAM) user. For instructions, see Create an IAM User.

Important: If the AWS account you use to follow this guide uses AWS Lake Formation to manage permissions on the Glue data catalog, make sure that you log in as a user that is both a Data lake administrator and a Database creator, as described in the documentation.

Launching your CloudFormation stack

To create your resources for this use case, complete the following steps:

  1. Launch your stack in us-east-1:
  2. Select the I acknowledge that AWS CloudFormation might create IAM resources with custom names option.
  3. Choose Create stack.

Launching this stack creates AWS resources. The following resources shown in the AWS CloudFormation output are the ones you need in the next steps:

  • Key – Description
  • AWSGlueStudioRole – IAM role to run AWS Glue jobs
  • AWSGlueStudioS3Bucket – Name of the S3 bucket to store blog-related files
  • AWSGlueStudioTicketsYYZDB – AWS Glue Data Catalog database
  • AWSGlueStudioTableTickets – Data Catalog table to use as a source
  • AWSGlueStudioTableTrials – Data Catalog table to use as a source
  • AWSGlueStudioParkingTicketCount –Data Catalog table to use as the destination

Creating a job

A job is the AWS Glue component that allows the implementation of business logic to transform data as part of the ETL process. For more information, see Adding Jobs in AWS Glue.

To create an AWS Glue job using AWS Glue Studio, complete the following steps:

  1. On the AWS Management Console, choose Services.
  2. Under Analytics, choose AWS Glue.
  3. In the navigation pane, choose AWS Glue Studio.
  4. On the AWS Glue Studio home page, choose Create and manage jobs.

AWS Glue Studio supports different sources, including Amazon S3, Amazon RDS, Amazon Kinesis, and Apache Kafka. For this post, you use two AWS Glue tables as data sources and one S3 bucket as the destination.

  1. In the Create Job section, select Blank graph.
  2. Choose Create.

This takes you to the Visual Canvas to create an AWS Glue job.

  1. Change the Job name from Untitled Job to YYZ-Tickets-Job.

You now have an AWS Glue job ready to filter, join, and aggregate data from two different sources.

Adding sources

For this post, you use two AWS Glue tables as data sources: Tickets and Trials, which the CloudFormation template created. The data is located in an external S3 bucket in Parquet format. To add these tables as sources, complete the following steps:

  1. Choose the (+) icon.
  2. On the Node properties tab, for Name, enter Tickets.
  3. For Node type, choose S3.
  4. On the Data Source properties -S3 tab, for Database, choose yyz-tickets.
  5. For Table, choose tickets.
  6. For Partition predicate (optional), leave blank.

Before adding the second data source to the ETL job, be sure that the node you just created isn’t selected.

  1. Choose the (+) icon.
  2. On the Node properties tab, for Name, enter Trials.
  3. For Node type, choose S3.
  4. On the Data Source properties -S3 tab, for Database, choose yyz-tickets.
  5. For Table, choose trials.
  6. For Partition predicate (optional), leave blank.

You now have two AWS Glue tables as the data sources for the AWS Glue job.

Adding transforms

A transform is the AWS Glue Studio component were the data is modified. You have the option of using different transforms that are part of this service or custom code. To add transforms, complete the following steps:

  1. Choose the Tickets node.
  2. Choose the (+) icon.

  3. On the Node properties tab, for Name, enter Ticket_Mapping.
  4. For Node type, choose ApplyMapping.
  5. For Node parents, choose Tickets.
  6. On the Transform tab, change the ticket_number data type from decimal to int.
  7. Drop the following columns:
    • Location1
    • Location2
    • Location3
    • Location4
    • Province

Now you add a second ApplyMapping transform to modify the Trials data source.

  1. Choose the Trials data source node.
  2. Choose the (+) icon.
  3. On the Node properties tab, for Name, enter Trial_Mapping.
  4. For Node type, choose ApplyMapping.
  5. For Node parents, leave at default value (Trials).
  1. On the Transform tab, change the parking_ticket_number data type from long to int.

Now that you have set the right data types and removed some of the columns, it’s time to join the data sources using the Join transform.

  1. Choose the Ticket_Mapping transform.
  2. Choose the (+) icon.

  3. On the Node properties tab, for Name, enter Join_Ticket_Trial.
  4. For Node type, choose Join.
  5. For Node parents, choose Ticket_Mapping and Trial_Mapping.
  6. On the Transform tab, for Join type, choose Inner join.
  7. For Join conditions, choose Add condition.
  8. For Ticket_Mapping, choose ticket_number.
  9. For Trial_Mapping, choose parking_ticket_number.

Now the two data sources are joined by the ticket_number and parking_ticket_number columns.

Performing data aggregation

In this step, you do some data aggregation to see the number of tickets handled per court in Toronto.

AWS Glue Studio offers the option of adding custom code for those use cases that need a more complex transformation. For this post, we use PySpark code to do the data transformation. It contains Sparksql code and a combination of dynamic frames and data frames.

  1. Choose the Join_Tickets_Trial transform.
  2. Choose the (+) icon.

  3. On the Node properties tab, for Name, enter Aggregate_Tickets.
  4. For Node type, choose Custom transform.
  5. For Node parents, leave Join_Ticket_Trial selected.
  6. On the Transform tab, for Code block, change the function name from MyTransform to Aggregate_Tickets.
  7. Enter the following code:
    selected = dfc.select(list(dfc.keys())[0]).toDF()
    selected.createOrReplaceTempView("ticketcount")
    totals = spark.sql("select court_location as location, infraction_description as infraction, count(infraction_code) as total  FROM ticketcount group by infraction_description, infraction_code, court_location order by court_location asc")
    results = DynamicFrame.fromDF(totals, glueContext, "results")
    return DynamicFrameCollection({"results": results}, glueContext)
    

After adding the custom transformation to the AWS Glue job, you want to store the result of the aggregation in the S3 bucket. To do this, you need a Select from collection transform to read the output from the Aggregate_Tickets node and send it to the destination.

  1. Choose the New node node.
  2. Leave the Transform tab with the default values.
  3. On the Node Properties tab, change the name of the transform to Select_Aggregated_Data.
  4. Leave everything else with the default values.
  5. Choose the Select_Aggregated_Data node.
  6. Choose the (+) icon.

  7. On the Node properties tab, for Name, enter Ticket_Count_Dest.
  8. For Node type, choose S3 in the Data target section.
  9. For Node parents, choose Select_Aggregated_Data.
  10. On the Data Target Properties-S3 tab, for Format, choose Parquet.
  11. For Compression Type, choose GZIP.
  12. For S3 Target Location, enter s3://glue-studio-blog-{Your Account ID as a 12-digit number}/parking_tickets_count/.

The job should look like the following screenshot.

You now have three transforms to do data mapping, filtering, and aggregation.

Configuring the job

When the logic behind the job is complete, you must set the parameters for the job run. In this section, you configure the job by selecting components such as the IAM role and the AWS Glue version you use to run the job.

  1. On the Job details tab, for Description, enter Glue Studio blog post job.
  2. For IAM Role, choose AWSGlueStudioRole (which the CloudFormation template created).
  3. For Job Bookmark, choose Disable.
  4. For Number of retries, optionally enter 1.
  5. Choose Save.
  6. When the job is saved, choose Run.

Monitoring the job

AWS Glue Studio offers a job monitoring dashboard that provides comprehensive information about your jobs. You can get job statistics and see detailed info about the job and the job status when running.

  1. In the AWS Glue Studio navigation panel, choose Monitoring.
  2. Choose the entry with the job name YYZ-Tickets_Job.
  3. For get more details about the job run, choose View run details.
  4. Wait until Run Status changes to Succeeded.

You can verify that the job ran successfully on the Amazon Athena console.

  1. On the Athena console, choose the yyz-tickets database.
  2. Choose the icon next to the parking_tickets_count table (which the CloudFormation template created).

For more information about creating AWS Glue tables, see Defining Tables in the AWS Glue Data Catalog.

  1. Choose Preview table.

As you can see in the following screenshot, the information that the job generated is available and you can query the number of tickets types per court issued in the city of Toronto in 2018.

Cleaning up

To avoid incurring future charges and to clean up unused roles and policies, delete the resources you created: the CloudFormation stack, S3 bucket, and AWS Glue job.

Conclusion

In this post, you learned how to use AWS Glue Studio to create an ETL job. You can use AWS Glue Studio to speed up the ETL job creation process and allow different personas to transform data without any previous coding experience. For more information about AWS Glue Studio, see the AWS Glue Studio documentation and What’s New with AWS.


About the author

Leonardo Gómez is a Senior Analytics Specialist Solution Architect at AWS. Based in Toronto, Canada, He works with customers across Canada to design and build big data solutions.

 

 

 

 

 

 

 

Automating bucketing of streaming data using Amazon Athena and AWS Lambda

Post Syndicated from Ahmed Saef Zamzam original https://aws.amazon.com/blogs/big-data/automating-bucketing-of-streaming-data-using-amazon-athena-and-aws-lambda/

In today’s world, data plays a vital role in helping businesses understand and improve their processes and services to reduce cost. You can use several tools to gain insights from your data, such as Amazon Kinesis Data Analytics or open-source frameworks like Structured Streaming and Apache Flink to analyze the data in real time. Alternatively, you can batch analyze the data by ingesting it into a centralized storage known as a data lake. Data lakes allow you to import any amount of data that can come in real time or batch. With Amazon Simple Storage Service (Amazon S3), you can cost-effectively build and scale a data lake of any size in a secure environment where data is protected by 99.999999999% (11 9s) of durability.

After the data lands in your data lake, you can start processing this data using any Big Data processing tool of your choice. Amazon Athena is a fully managed interactive query service that enables you to analyze data stored in an Amazon S3-based data lake using standard SQL. You can also integrate Athena with Amazon QuickSight for easy visualization of the data.

When working with Athena, you can employ a few best practices to reduce cost and improve performance. Converting to columnar formats, partitioning, and bucketing your data are some of the best practices outlined in Top 10 Performance Tuning Tips for Amazon Athena. Bucketing is a technique that groups data based on specific columns together within a single partition. These columns are known as bucket keys. By grouping related data together into a single bucket (a file within a partition), you significantly reduce the amount of data scanned by Athena, thus improving query performance and reducing cost. For example, imagine collecting and storing clickstream data. If you frequently filter or aggregate by user ID, then within a single partition it’s better to store all rows for the same user together. If user data isn’t stored together, then Athena has to scan multiple files to retrieve the user’s records. This leads to more files being scanned, and therefore, an increase in query runtime and cost.

Like partitioning, columns that are frequently used to filter the data are good candidates for bucketing. However, unlike partitioning, with bucketing it’s better to use columns with high cardinality as a bucketing key. For example, Year and Month columns are good candidates for partition keys, whereas userID and sensorID are good examples of bucket keys. By doing this, you make sure that all buckets have a similar number of rows. For more information, see Bucketing vs Partitioning.

For real-time data (such as data coming from sensors or clickstream data), streaming tools like Amazon Kinesis Data Firehose can convert the data to columnar formats and partition it while writing to Amazon S3. With Kafka, you can do the same thing with connectors. But what about bucketing? This post shows how to continuously bucket streaming data using AWS Lambda and Athena.

Overview of solution

The following diagram shows the high-level architecture of the solution.

The architecture includes the following steps:

  1. We use the Amazon Kinesis Data Generator (KDG) to simulate streaming data. Data is then written into Kinesis Data Firehose; a fully managed service that enables you to load streaming data to an Amazon S3-based data lake.
  2. Kinesis Data Firehose partitions the data by hour and writes new JSON files into the current partition in a /raw Each new partition looks like /raw/dt=<YYYY-MM-dd-HH>. Every hour, a new partition is created.
  3. Two Lambda functions are triggered on an hourly basis based on Amazon CloudWatch Events.
    • Function 1 (LoadPartition) runs every hour to load new /raw partitions to Athena SourceTable, which points to the /raw prefix.
    • Function 2 (Bucketing) runs the Athena CREATE TABLE AS SELECT (CTAS) query.
  4. The CTAS query copies the previous hour’s data from /raw to /curated and buckets the data while doing so. It loads the new data as a new partition to TargetTable, which points to the /curated prefix.

Overview of walkthrough

In this post, we cover the following high-level steps:

  1. Install and configure the KDG.
  2. Create a Kinesis Data Firehose delivery stream.
  3. Create the database and tables in Athena.
  4. Create the Lambda functions and schedule them.
  5. Test the solution.
  6. Create view that the combines data from both tables.
  7. Clean up.

Installing and configuring the KDG

First, we need to install and configure the KDG in our AWS account. To do this, we use the following AWS CloudFormation template.

For more information about installing the KDG, see the KDG Guide in GitHub.

To configure the KDG, complete the following steps:

  1. On the AWS CloudFormation console, locate the stack you just created.
  2. On the Outputs tab, record the value for KinesisDataGeneratorUrl.
  3. Log in to the KDG main page using the credentials created when you deployed the CloudFormation template.
  4. In the Record template section, enter the following template. Each record has three fields: sensorID, currentTemperature, and status.
    {
        "sensorId": {{random.number(4000)}},
        "currentTemperature": {{random.number(
            {
                "min":10,
                "max":50
            }
        )}},
        "status": "{{random.arrayElement(
            ["OK","FAIL","WARN"]
        )}}"
    }
    

  5. Choose Test template.

The result should look like the following screenshot.

We don’t start sending data now; we do this after creating all other resources.

Creating a Kinesis Data Firehose delivery stream

Next, we create the Kinesis Data Firehose delivery stream that is used to load the data to the S3 bucket.

  1. On the Amazon Kinesis console, choose Kinesis Data Firehose.
  2. Choose Create delivery stream.
  3. For Delivery stream name, enter a name, such as AutoBucketingKDF.
  4. For Source, select Direct PUT or other sources.
  5. Leave all other settings at their default and choose Next.
  6. On Process Records page, leave everything at its default and choose Next.
  7. Choose Amazon S3 as the destination and choose your S3 bucket from the drop-down menu (or create a new one). For this post, I already have a bucket created.
  8. For S3 Prefix, enter the following prefix:
    raw/dt=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}-!{timestamp:HH}/

We use custom prefixes to tell Kinesis Data Firehose to create a new partition every hour. Each partition looks like this: dt=YYYY-MM-dd-HH. This partition-naming convention conforms to the Hive partition-naming convention, <PartitionKey>=<PartitionKey>. In this case, <PartitionKey> is dt and <PartitionValue> is YYYY-MM-dd-HH. By doing this, we implement a flat partitioning model instead of hierarchical (year=YYYY/month=MM/day=dd/hour=HH) partitions. This model can be much simpler for end-users to work with, and you can use a single column (dt) to filter the data. For more information on flat vs. hierarchal partitions, see Data Lake Storage Foundation on GitHub.

  1. For S3 error prefix, enter the following code:
    myFirehoseFailures/!{firehose:error-output-type}/

  2. On the Settings page, leave everything at its default.
  3. Choose Create delivery stream.

Creating an Athena database and tables

In this solution, the Athena database has two tables: SourceTable and TargetTable. Both tables have identical schemas and will have the same data eventually. However, each table points to a different S3 location. Moreover, because data is stored in different formats, Athena uses a different SerDe for each table to parse the data. SourceTable uses JSON SerDe and TargetTable uses Parquet SerDe. One other difference is that SourceTable’s data isn’t bucketed, whereas TargetTable’s data is bucketed.

In this step, we create both tables and the database that groups them.

  1. On the Athena console, create a new database by running the following statement:
    CREATE DATABASE mydatabase

  2. Choose the database that was created and run the following query to create SourceTable. Replace <s3_bucket_name> with the bucket name you used when creating the Kinesis Data Firehose delivery stream.
    CREATE EXTERNAL TABLE mydatabase.SourceTable(
      sensorid string, 
      currenttemperature int, 
      status string)
    PARTITIONED BY ( 
      dt string)
    ROW FORMAT SERDE 
      'org.openx.data.jsonserde.JsonSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
    LOCATION
      's3://<s3_bucket_name>/raw/'
    

  3. Run the following CTAS statement to create TargetTable:
    CREATE TABLE TargetTable
    WITH (
          format = 'PARQUET', 
          external_location = 's3://<s3_bucket_name>/curated/', 
          partitioned_by = ARRAY['dt'], 
          bucketed_by = ARRAY['sensorID'], 
          bucket_count = 3) 
    AS SELECT *
    FROM SourceTable

SourceTable doesn’t have any data yet. However, the preceding query creates the table definition in the Data Catalog. We configured this data to be bucketed by sensorID (bucketing key) with a bucket count of 3. Ideally, the number of buckets should be so that the files are of optimal size.

Creating Lambda functions

The solution has two Lambda functions: LoadPartiton and Bucketing. We use an AWS Serverless Application Model (AWS SAM) template to create, deploy, and schedule both functions.

Follow the instructions in the GitHub repo to deploy the template. When deploying the template, it asks you for some parameters. You can use the default parameters, but you have to change S3BucketName and AthenaResultLocation. For more information, see Parameter Details in the GitHub repo.

LoadPartition function

The LoadPartiton function is scheduled to run the first minute of every hour. Every time Kinesis Data Firehose creates a new partition in the /raw folder, this function loads the new partition to the SourceTable. This is crucial because the second function (Bucketing) reads this partition the following hour to copy the data to /curated.

Bucketing function

The Bucketing function is scheduled to run the first minute of every hour. It copies the last hour’s data from SourceTable to TargetTable. It does so by creating a tempTable using a CTAS query. This tempTable points to the new date-hour folder under /curated; this folder is then added as a single partition to TargetTable.

To implement this, the function runs three queries sequentially. The queries use two parameters:

  • <s3_bucket_name> – Defined by an AWS SAM parameter and should be the same bucket used throughout this solution
  • <last_hour_partition> – Is calculated by the function depending on which hour it’s running

The function first creates TempTable as the result of a SELECT statement from SourceTable. It stores the results in a new folder under /curated. The results are bucketed and stored in Parquet format. See the following code:

CREATE TABLE TempTable
    WITH (
      format = 'PARQUET', 
      external_location = 's3://<s3_bucket_name>/curated/dt=<last_hour_partition>/', 
      bucketed_by = ARRAY['sensorID'], 
      bucket_count = 3) 
    AS SELECT *
    FROM SourceTable
    WHERE dt='<last_hour_partiton>';

We create a new subfolder in /curated, which is new partition for TargetTable. So, after the TempTable creation is complete, we load the new partition to TargetTable:

ALTER TABLE TargetTable
                ADD IF NOT EXISTS
                PARTITION ('<last_hour_partiton>');

Finally, we delete tempTable from the Data Catalog:

DROP TABLE TempTable

Testing the solution

Now that we have created all resources, it’s time to test the solution. We start by generating data from the KDG and waiting for an hour to start querying data in TargetTable (the bucketed table).

  1. Log in to the KDG. You should find the template you created earlier. For the configuration, choose the following:
    1. The Region used.
    2. For the delivery stream, choose the Kinesis Data Firehose you created earlier.
    3. For records/sec, enter 3000.
  2. Choose Send data.

The KDG starts sending simulated data to Kinesis Data Firehose. After 1 minute, a new partition should be created in Amazon S3.

The Lambda function that loads the partition to SourceTable runs on the first minute of the hour. If you started sending data after the first minute, this partition is missed because the next run loads the next hour’s partition, not this one. To mitigate this, run MSCK REPAIR TABLE SourceTable only for the first hour.

  1. To benchmark the performance between both tables, wait for an hour so that the data is available for querying in TargetTable.
  2. When the data is available, choose one sensorID and run the following query on SourceTable and TargetTable.
    SELECT sensorID, avg(currenttemperature) as AverageTempreture 
    FROM <TableName>
    WHERE dt='<YYYY-MM-dd-HH>' AND sensorID ='<sensorID_selected>'
    GROUP BY 1

The following screenshot shows the query results for SourceTable. It shows the runtime in seconds and amount of data scanned.

The following screenshot shows the query results for TargetTable.

If you look at these results, you don’t see a huge difference in runtime for this specific query and dataset; for other datasets, this difference should be more significant. However, from a data scanning perspective, after bucketing the data, we reduced the data scanned by approximately 98%. Therefore, for this specific use case, bucketing the data lead to a 98% reduction in Athena costs because you’re charged based on the amount of data scanned by each query.

Querying the current hour’s data

Data for the current hour isn’t available immediately in TargetTable. It’s available for querying after the first minute of the following hour. To query this data immediately, we have to create a view that UNIONS the previous hour’s data from TargetTable with the current hour’s data from SourceTable. If data is required for analysis after an hour of its arrival, then you don’t need to create this view.

To create this view, run the following query in Athena:

CREATE OR REPLACE VIEW combined AS

SELECT *, "$path" AS file
FROM SourceTable
WHERE dt >= date_format(date_trunc('hour', (current_timestamp)), '%Y-%m-%d-%H')

UNION ALL 

SELECT *, "$path" AS file
FROM TargetTable
WHERE dt < date_format(date_trunc('hour', (current_timestamp)), '%Y-%m-%d-%H')

Cleaning up

Delete the resources you created if you no longer need them.

  1. Delete the Kinesis Data Firehose delivery stream.
  2. In Athena, run the following statements
    1. DROP DATABASE mydatabase
    2. DROP TABLE SourceTable
    3. DROP TABLE TargetTable
  3. Delete the AWS SAM template to delete the Lambda functions.
  4. Delete the CloudFormation stack for the KDG. For more information, see Deleting a stack on the AWS CloudFormation console.

Conclusion

Bucketing is a powerful technique and can significantly improve performance and reduce Athena costs. In this post, we saw how to continuously bucket streaming data using Lambda and Athena. We used a simulated dataset generated by Kinesis Data Generator. The same solution can apply to any production data, with the following changes:

  • DDL statements
  • Functions used can work with data that is partitioned by hour with the partition key ‘dt’ and partition value <YYYY-MM-dd-HH>. If your data is partitioned in a different way, edit the Lambda functions accordingly.
  • Frequency of Lambda triggers.

About the Author

Ahmed Zamzam is a Solutions Architect with Amazon Web Services. He supports SMB customers in the UK in their digital transformation and their cloud journey to AWS, and specializes in Data Analytics. Outside of work, he loves traveling, hiking, and cycling.

 

 

 

 

Bringing the power of embedded analytics to your apps and services with Amazon QuickSight

Post Syndicated from Dorothy Li original https://aws.amazon.com/blogs/big-data/bringing-the-power-of-embedded-analytics-to-your-apps-and-services-with-amazon-quicksight/

In the world we live in today, companies need to quickly react to change—and to anticipate it. Customers tell us that their reliance on data has never been greater than what it is today. To improve your decision-making, you have two types of data transformation needs: data agility, the speed at which data turns into insights, and data transparency, the need to present insights to decision makers. Going forward, we expect data transformation projects to become a centerpiece in every organization, big or small.

Furthermore, applications are migrating to the cloud faster than ever. Applications need to scale quickly to potentially millions of users, have global availability, manage petabytes of data, and respond in milliseconds. Such modern applications are built with a combination of these new architecture patterns, operational models, and software delivery processes, and allow businesses to innovate faster while reducing risk, time-to-market, and total cost of ownership.

An emerging area from these two trends is to combine the power of application modernization with data transformation. This emerging trend is often called embedded analytics, and is the focus of this post.

The case for embedded analytics

Applications generate a high volume of structured and unstructured data. This could be clickstream data, sales data, data from IoT devices, social data, and more. Customers who are building these applications (such as software-as-a-service (SaaS) apps or enterprise portals) often tell us that their end-users find it challenging to derive meaning from this data because traditional business intelligence (BI) approaches don’t always work.

Traditional BI tools live in disparate systems and require data engineering teams to provide connectivity and continous integration with the application, adding to complexity and delays in the overall process. Even after the connectivity is built, you must switch back and forth between your application and the BI tool, causing frustration and decreasing the overall pace of decision-making. Customers tell us that their development teams are constantly looking for new ways to delight their users, and embedding the BI capability directly into their applications is one of the most requested asks from their end-users.

Given the strategic importance of this capability, you can use this to differentiate and up-sell as a new service in their applications. Gartner research demonstrates that 63% of CEOs expect to adopt a product-as-a-service model in the next two years, making this a major market opportunity. For example, if you provide financial services software, you can empower users to perform detailed analysis of portfolio performance trends. An HR solution might enable managers to visualize and predict turnover rates. A supply chain management solution could embed the ability to slice and dice KPIs and better understand the efficiency of logistics routes.

Comparing common approaches to embedded analytics

The approach to building an embedded analytics capability needs to deliver on the requirements of modern applications. It must be scalable, handle large amounts of data without compromising agility, and seamlessly integrate with the application’s user experience. Choosing the right methodology becomes especially important in the face of these needs.

You can build your own embedded analytics solution, but although this gives you maximum control, it has a number of disadvantages. You have to hire specialized resources (such as data engineers for building data connectivity and UX developers for building dashboards) and maintain dedicated infrastructure to manage the data processing needs of the application. This can be expensive, resource-intensive, and complex to build.

Embedding traditional BI solutions that are available in the market has limitations as well, because they’re not purpose-built for embedding use cases. Most solutions are server-based, meaning that they’re challenging to scale and require additional infrastructure setup and ongoing maintenance. These solutions also have restrictive, pay-per-server pricing, which doesn’t fully meet the needs of end-users that are consuming applications or portals via a session-based usage model.

A new approach to embedded analytics

At AWS re:Invent 2019, we launched new capabilities in Amazon QuickSight that make it easy to embed analytics into your applications and portals, empowering your customers to gain deeper insights into your application’s data. Unlike building your own analytics solution, which can be time-consuming and hard to scale, QuickSight allows you to quickly embed interactive dashboards and visualizations into your applications without compromising on the ability to personalize the look and feel of these new features.

QuickSight has a serverless architecture that automatically scales your applications from a few to hundreds of thousands of users without the need to build, set up, and manage your own analytics infrastructure. These capabilities allow you to deliver embedded analytics at hyperscale. So, why does hyperscale matter? Traditional BI tools run on a fixed amount of hardware resources, therefore more users, more concurrency, or more complex queries impact performance across all users, which requires you to add more capacity (leading to higher costs).

The following diagram illustrates a traditional architecture, which requires additional servers (and higher upfront cost) to scale.

With QuickSight, you have access to the power and scale of the AWS Cloud. You get auto scaled, consistent performance no matter the concurrency or scale of the userbase, and a truly pay-per-use architecture, meaning you only pay when your users access the dashboards or reports. The following diagram illustrates how QuickSight scales seamlessly with its serverless architecture, powered by the AWS cloud.

Furthermore, QuickSight enables your users to perform machine learning based insights such as anomaly detection, forecasting, and natural language queries. It also has a rich set of APIs that allow you to programmatically manage your analytics workflows, such as moving dashboards across accounts, automating deployments, and managing access for users with single sign-on (SSO).

New features in QuickSight Embedded Analytics

We recently announced the launch of additional embedding capabilities that allow you to do even more with QuickSight embedded analytics. QuickSight now allows you to embed dashboard authoring within applications (such as SaaS applications and enterprise portals), allowing you to empower your end-users to create their own visualizations and reports.

These ad hoc data analysis and self-service data exploration capabilities mean you don’t have to repeatedly create custom dashboards based on requests from your end-users, and can provide end-users with even greater agility and transparency with their data. This capability helps create product differentiation and up-sell opportunities within customer applications.

With this launch, QuickSight also provides namespaces, a multi-tenant capability that allows you to easily maintain data isolation while supporting multiple workloads within the same QuickSight account. For example, if you’re an independent software vendor (ISV), you can now assign dedicated namespaces to different customers within the same QuickSight account. This allows you to securely manage multiple customer workloads as users (authors or readers) within one namespace, and they can only discover and share content with other users within the same namespace, without exposing any data to other parties.

Without namespaces, you could set up your own embedded dashboards for hundreds of thousands of users with QuickSight. For example, see the following dashboard for our fictional company, Oktank Analytica.

With namespaces in place, you can extend this to provide ad-hoc authoring capabilities using curated datasets specific to each customer, created and shared by the developer or ISV. See the following screenshot.

For more information about these new features, see Embed multi-tenant analytics in applications with Amazon QuickSight.

Customer success stories

Customers are already using embedded analytics in QuickSight to great success. In this section, we share the stories of a few customers.

Blackboard

Blackboard is a leading EdTech company, serving higher education, K-12, business, and government clients around the world.

“The recent wave in digital transformation in the global education community has made it clear that it’s time for a similar transformation in the education analytics tools that support that community,” says Rachel Scherer, Sr. Director of Data & Analytics at Blackboard. “We see a need to support learners, teachers, and leaders in education by helping to change their relationship with data and information—to reduce the distance between information and experience, between ‘informed’ and ‘acting.’

“A large part of this strategy involves embedding information directly where our users are collaborating, teaching, and learning—providing tools and insights that aid in assessment, draw attention to opportunities learners may be missing, and help strategic and academic leadership identify patterns and opportunities for intervention. We’re particularly interested in making the experience of being informed much more intuitive—favoring insight-informed workflows and/or embedded prose over traditional visualizations that require interpretation.

“By removing the step of interpretation, embedded visualizations make insights more useful and actionable. With QuickSight, we were able to deliver on our promise of embedding visualizations quickly, supporting the rapid iteration that we require, at the large scale needed to support our global user community.”

For more information about Blackboard’s QuickSight use case, see the AWS Online Tech Talk Embedding Analytics in your Applications with Amazon QuickSight at the 25:50 mark.

Comcast

Syndication Insights (SI) enables Comcast’s syndicated partners to access the same level of rich data insights that Comcast uses for platform and operational improvements.

“The SI platform enables partners to gain deeper business insights, such as early detection into anomalies for users, while ensuring a seamless experience through embedded, interactive reports,” says Ajay Gavagal, Sr. Manager of Software Development at Comcast. “From the start, scalability was a core requirement for us. We chose QuickSight as it is scalable, enabling SI to extend to multiple syndicated partners without having to provision or manage additional infrastructure. Furthermore, QuickSight provides interactive dashboards that can be easily embedded into an application. Lastly, QuickSight’s rich APIs abstract away a lot of functionality that would otherwise need to be custom built.”

For more information about how Comcast uses QuickSight, see the AWS Online Tech Talk Embedding Analytics in your Applications with Amazon QuickSight at the 38:05 mark.

Panasonic Avionics Corporation

Panasonic Avionics Corporation provides customized in-flight entertainment and communications systems to more than 300 airlines worldwide.

“Our cloud-based solutions collect large amounts of anonymized data that help us optimize the experience for both our airline partners and their passengers,” says Anand Desikan, Director of Cloud Operations at Panasonic Avionics Corporation. “We started using Amazon QuickSight to report on in-flight Wi-Fi performance, and with its rich APIs, pay-per-session pricing, and ability to scale, we quickly rolled out QuickSight dashboards to hundreds of users. The constant evolution of the platform has been impressive: ML-powered anomaly detection, Amazon SageMaker integration, embedding, theming, and cross-visual filtering. Our users consume insights via natural language narratives, which allows them to read all their information right off the dashboard with no complex interpretation needed.”

EHE Health

EHE Health is national preventive health and primary care Center of Excellence provider system.

“As a 106-year-old organization moving toward greater agility and marketplace nimbleness, we needed to drastically upgrade our ability to be transparent within our internal and external ecosystems,” says David Buza, Chief Technology Officer at EHE Health. “With QuickSight, we are not constrained by pre-built BI reports, and can easily customize and track the right operational metrics, such as product utilization, market penetration, and available inventory to gain a holistic view of our business. These inputs help us to understand current performance and future opportunity so that we can provide greater partnership to our clients, while delivering on our brand promise of creating healthier employee populations.

“QuickSight allowed our teams to seamlessly communicate with our clients—all viewing the same information, simultaneously. QuickSight’s embedding capabilities, along with its secure platform, intuitive design, and flexibility, allowed us to service all stakeholders—both internally and externally. This greater flexibility and customization allowed us to fit the client’s needs seamlessly.”

Conclusion

Where data agility and transparency are critical to business success, embedded analytics can open a universe of possibilities, and we are excited to see what our customers will do with these new capabilities.

Additional resources

For more resources, see the following:


About the Author

Dorothy Li is the Vice President and General Manager for Amazon QuickSight.

Building an AWS Glue ETL pipeline locally without an AWS account

Post Syndicated from Adnan Alvee original https://aws.amazon.com/blogs/big-data/building-an-aws-glue-etl-pipeline-locally-without-an-aws-account/

If you’re new to AWS Glue and looking to understand its transformation capabilities without incurring an added expense, or if you’re simply wondering if AWS Glue ETL is the right tool for your use case and want a holistic view of AWS Glue ETL functions, then please continue reading. In this post, we walk you through several AWS Glue ETL functions with supporting examples, using a local PySpark shell in a containerized environment with no AWS artifact dependency. If you’re already familiar with AWS Glue and Apache Spark, you can use this solution as a quick cheat sheet for AWS Glue PySpark validations.

You don’t need an AWS account to follow along with this walkthrough. We use small example datasets for our use case and go through the transformations of several AWS Glue ETL PySpark functions: ApplyMapping, Filter, SplitRows, SelectFields, Join, DropFields, Relationalize, SelectFromCollection, RenameField, Unbox, Unnest, DropNullFields, SplitFields, Spigot and Write Dynamic Frame.

This post provides an introduction of the transformation capabilities of AWS Glue and provides insights towards possible uses of the supported functions. The goal is to get up and running with AWS Glue ETL functions in the shortest possible time, at no cost and without any AWS environment dependency.

Prerequisites

To follow along, you should have the following resources:

  • Basic programming experience
  • Basic Python and Spark knowledge (not required but good to have)
  • A desktop or workstation with Docker installed and running

If you prefer to set up the environment locally outside of a Docker container, you can follow the instructions provided in the GitHub repo, which hosts libraries used in AWS Glue. These libraries extend Apache Spark with additional data types and operations for ETL workflows.

Setting up resources

For this post, we use the amazon/aws-glue-libs:glue_libs_1.0.0_image_01 image from Dockerhub. This image has only been tested for AWS Glue 1.0 spark shell (PySpark). Additionally, this image also supports Jupyter and Zeppelin notebooks and a CLI interpreter. For the purpose of this post, we use the CLI interpreter. For more information on the container, please read Developing AWS Glue ETL jobs locally using a container.

To pull the relevant image from the Docker repository, enter the following command in a terminal prompt:

docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01

To test on the command prompt, enter the following code:

docker run -itd --name glue_without_notebook amazon/aws-glue-libs:glue_libs_1.0.0_image_01
docker exec -it glue_without_notebook bash
/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/bin/pyspark

To test on Jupyter notebooks, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter \amazon/aws-glue-libs:glue_libs_1.0.0_image_01 \
/home/jupyter/jupyter_start.sh

Browse to ‘localhost:8888’ in a browser to open Jupyter notebooks.

Importing GlueContext

To get started, enter the following import statements in the PySpark shell. We import GlueContext, which wraps the Spark SQLContext, thereby providing mechanisms to interact with Apache Spark:

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import *
from pyspark.sql import Row
glueContext = GlueContext(SparkContext.getOrCreate())

Dataset 1

We first generate a Spark DataFrame consisting of dummy data of an order list for a fictional company. We process the data using AWS Glue PySpark functions.

Enter the following code into the shell:

order_list = [
               ['1005', '623', 'YES', '1418901234', '75091'],\
               ['1006', '547', 'NO', '1418901256', '75034'],\
               ['1007', '823', 'YES', '1418901300', '75023'],\
               ['1008', '912', 'NO', '1418901400', '82091'],\
               ['1009', '321', 'YES', '1418902000', '90093']\
             ]

# Define schema for the order_list
order_schema = StructType([  
                      StructField("order_id", StringType()),
                      StructField("customer_id", StringType()),
                      StructField("essential_item", StringType()),
                      StructField("timestamp", StringType()),
                      StructField("zipcode", StringType())
                    ])

# Create a Spark Dataframe from the python list and the schema
df_orders = spark.createDataFrame(order_list, schema = order_schema)

The following .show() command allows us to view the DataFrame in the shell:

df_orders.show()

# Output
+--------+-----------+--------------+----------+-------+
|order_id|customer_id|essential_item| timestamp|zipcode|
+--------+-----------+--------------+----------+-------+
|    1005|        623|           YES|1418901234|  75091|
|    1006|        547|            NO|1418901256|  75034|
|    1007|        823|           YES|1418901300|  75023|
|    1008|        912|            NO|1418901400|  82091|
|    1009|        321|           YES|1418902000|  90093|
+--------+-----------+--------------+----------+-------+

DynamicFrame

A DynamicFrame is similar to a DataFrame, except that each record is self-describing, so no schema is required initially. Instead, AWS Glue computes a schema on-the-fly when required. We convert the df_orders DataFrame into a DynamicFrame.

Enter the following code in the shell:

dyf_orders = DynamicFrame.fromDF(df_orders, glueContext, "dyf") 

Now that we have our Dynamic Frame, we can start working with the datasets with AWS Glue transform functions.

ApplyMapping

The columns in our data might be in different formats, and you may want to change their respective names. ApplyMapping is the best option for changing the names and formatting all the columns collectively. For our dataset, we change some of the columns to Long from String format to save storage space later. We also shorten the column zipcode to zip. See the following code:

# Input 
dyf_applyMapping = ApplyMapping.apply( frame = dyf_orders, mappings = [ 
  ("order_id","String","order_id","Long"), 
  ("customer_id","String","customer_id","Long"),
  ("essential_item","String","essential_item","String"),
  ("timestamp","String","timestamp","Long"),
  ("zipcode","String","zip","Long")
])

dyf_applyMapping.printSchema()

# Output
root
|-- order_id: long
|-- customer_id: long
|-- essential_item: string
|-- timestamp: long
|-- zip: long

Filter

We now want to prioritize our order delivery for essential items. We can achieve that using the Filter function:

# Input 
dyf_filter = Filter.apply(frame = dyf_applyMapping, f = lambda x: x["essential_item"] == 'YES')

dyf_filter.toDF().show()

# Output 
+--------------+-----------+-----+----------+--------+
|essential_item|customer_id|  zip| timestamp|order_id|
+--------------+-----------+-----+----------+--------+
|           YES|        623|75091|1418901234|    1005|
|           YES|        823|75023|1418901300|    1007|
|           YES|        321|90093|1418902000|    1009|
+--------------+-----------+-----+----------+--------+

Map

Map allows us to apply a transformation to each record of a Dynamic Frame. For our case, we want to target a certain zip code for next day air shipping. We implement a simple “next_day_air” function and pass it to the Dynamic Frame:

# Input 

# This function takes in a dynamic frame record and checks if zipcode # 75034 is present in it. If present, it adds another column 
# “next_day_air” with value as True

def next_day_air(rec):
  if rec["zip"] == 75034:
    rec["next_day_air"] = True
  return rec

mapped_dyF =  Map.apply(frame = dyf_applyMapping, f = next_day_air)

mapped_dyF.toDF().show()

# Output
+--------------+-----------+-----+----------+--------+------------+
|essential_item|customer_id|  zip| timestamp|order_id|next_day_air|
+--------------+-----------+-----+----------+--------+------------+
|           YES|        623|75091|1418901234|    1005|        null|
|            NO|        547|75034|1418901256|    1006|        TRUE|
|           YES|        823|75023|1418901300|    1007|        null|
|            NO|        912|82091|1418901400|    1008|        null|
|           YES|        321|90093|1418902000|    1009|        null|
+--------------+-----------+-----+----------+--------+------------+

Dataset 2

To ship essential orders to the appropriate addresses, we need customer data. We demonstrate this by generating a custom JSON dataset consisting of zip codes and customer addresses. In this use case, this data represents the customer data of the company that we want to join later on.

We generate JSON strings consisting of customer data and use the Spark json function to convert them to a JSON structure (enter each jsonStr variable one at a time in case the terminal errors out):

# Input 
jsonStr1 = u'{ "zip": 75091, "customers": [{ "id": 623, "address": "108 Park Street, TX"}, { "id": 231, "address": "763 Marsh Ln, TX" }]}'
jsonStr2 = u'{ "zip": 82091, "customers": [{ "id": 201, "address": "771 Peek Pkwy, GA" }]}'
jsonStr3 = u'{ "zip": 75023, "customers": [{ "id": 343, "address": "66 P Street, NY" }]}'
jsonStr4 = u'{ "zip": 90093, "customers": [{ "id": 932, "address": "708 Fed Ln, CA"}, { "id": 102, "address": "807 Deccan Dr, CA" }]}'
df_row = spark.createDataFrame([
  Row(json=jsonStr1),
  Row(json=jsonStr2),
  Row(json=jsonStr3),
  Row(json=jsonStr4)
])

df_json = spark.read.json(df_row.rdd.map(lambda r: r.json))
df_json.show()

# Output
+-----------------------------------------------------+-----+
|customers                                            |zip  |
+-----------------------------------------------------+-----+
|[[108 Park Street, TX, 623], [763 Marsh Ln, TX, 231]]|75091|
|[[771 Peek Pkwy, GA, 201]]                           |82091|
|[[66 P Street, NY, 343]]                             |75023|
|[[708 Fed Ln, CA, 932], [807 Deccan Dr, CA, 102]]    |90093|
+-----------------------------------------------------+-----+
# Input
df_json.printSchema()

# Output
root
 |-- customers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |-- zip: long (nullable = true)

To convert the DataFrame back to a DynamicFrame to continue with our operations, enter the following code:

# Input
dyf_json = DynamicFrame.fromDF(df_json, glueContext, "dyf_json")

SelectFields

To join with the order list, we don’t need all the columns, so we use the SelectFields function to shortlist the columns we need. In our use case, we need the zip code column, but we can add more columns as the argument paths accepts a list:

# Input
dyf_selectFields = SelectFields.apply(frame = dyf_filter, paths=['zip'])

dyf_selectFields.toDF().show()

# Output
+-----+
|  zip|
+-----+
|75091|
|75023|
|90093|
+-----+

Join

The Join function is straightforward and manages duplicate columns. We had two columns named zip from both datasets. AWS Glue added a period (.) in one of the duplicate column names to avoid errors:

# Input
dyf_join = Join.apply(dyf_json, dyf_selectFields, 'zip', 'zip')
dyf_join.toDF().show()

# Output
+--------------------+-----+-----+
|           customers| .zip|  zip|
+--------------------+-----+-----+
|[[108 Park Street...|75091|75091|
|[[66 P Street, NY...|75023|75023|
|[[708 Fed Ln, CA,...|90093|90093|
+--------------------+-----+-----+

DropFields

Because we don’t need two columns with the same name, we can use DropFields to drop one or multiple columns all at once. The backticks (`) around .zip inside the function call are needed because the column name contains a period (.):

# Input
dyf_dropfields = DropFields.apply(
  frame = dyf_join,
  paths = "`.zip`"
)

dyf_dropfields.toDF().show()

# Output
+--------------------+-----+
|           customers|  zip|
+--------------------+-----+
|[[108 Park Street...|75091|
|[[66 P Street, NY...|75023|
|[[708 Fed Ln, CA,...|90093|
+--------------------+-----+

Relationalize

The Relationalize function can flatten nested structures and create multiple dynamic frames. Our customer column from the previous operation is a nested structure, and Relationalize can convert it into multiple flattened DynamicFrames:

# Input
dyf_relationize = dyf_dropfields.relationalize("root", "/home/glue/GlueLocalOutput")

To see the DynamicFrames, we can’t run a .show() yet because it’s a collection. We need to check what keys are present. See the following code:

# Input
dyf_relationize.keys()

# Output
dict_keys(['root', 'root_customers'])

In the follow-up function in the next section, we show how to pick the DynamicFrame from a collection of multiple DynamicFrames.

SelectFromCollection

The SelectFromCollection function allows us to retrieve the specific DynamicFrame from a collection of DynamicFrames. For this use case, we retrieve both DynamicFrames from the previous operation using this function.

To retrieve the first DynamicFrame, enter the following code:

# Input
dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'root')

dyf_selectFromCollection.toDF().show()

# Output
+---------+-----+
|customers|  zip|
+---------+-----+
|        1|75091|
|        2|75023|
|        3|90093|
+---------+-----+

To retrieve the second DynamicFrame, enter the following code:

# Input
dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'root_customers')

dyf_selectFromCollection.toDF().show()

# Output
+---+-----+---------------------+----------------+
| id|index|customers.val.address|customers.val.id|
+---+-----+---------------------+----------------+
|  2|    0|      66 P Street, NY|             343|
|  3|    0|       708 Fed Ln, CA|             932|
|  3|    1|    807 Deccan Dr, CA|             102|
|  1|    0|  108 Park Street, TX|             623|
|  1|    1|     763 Marsh Ln, TX|             231|
+---+-----+---------------------+----------------+

RenameField

The second DynamicFrame we retrieved from the previous operation introduces a period (.) into our column names and is very lengthy. We can change that using the RenameField function:

# Input
dyf_renameField_1 = RenameField.apply(dyf_selectFromCollection, "`customers.val.address`", "address")

dyf_renameField_2 = RenameField.apply(dyf_renameField_1, "`customers.val.id`", "cust_id")

dyf_dropfields_rf = DropFields.apply(
  frame = dyf_renameField_2,
  paths = ["index", "id"]
)

dyf_dropfields_rf.toDF().show()

# Output
+-------------------+-------+
|            address|cust_id|
+-------------------+-------+
|    66 P Street, NY|    343|
|     708 Fed Ln, CA|    932|
|  807 Deccan Dr, CA|    102|
|108 Park Street, TX|    623|
|   763 Marsh Ln, TX|    231|
+-------------------+-------+

ResolveChoice

ResloveChoice can gracefully handle column type ambiguities. For more information about the full capabilities of ResolveChoice, see the GitHub repo.

# Input
dyf_resolveChoice = dyf_dropfields_rf.resolveChoice(specs = [('cust_id','cast:String')])

dyf_resolveChoice.printSchema()

# Output
root
|-- address: string
|-- cust_id: string

Dataset 3

We generate another dataset to demonstrate a few other functions. In this use case, the company’s warehouse inventory data is in a nested JSON structure, which is initially in a String format. See the following code:

# Input
warehouse_inventory_list = [
              ['TX_WAREHOUSE', '{\
                          "strawberry":"220",\
                          "pineapple":"560",\
                          "mango":"350",\
                          "pears":null}'
               ],\
              ['CA_WAREHOUSE', '{\
                         "strawberry":"34",\
                         "pineapple":"123",\
                         "mango":"42",\
                         "pears":null}\
              '],
    		   ['CO_WAREHOUSE', '{\
                         "strawberry":"340",\
                         "pineapple":"180",\
                         "mango":"2",\
                         "pears":null}'
              ]
            ]


warehouse_schema = StructType([StructField("warehouse_loc", StringType())\
                              ,StructField("data", StringType())])

df_warehouse = spark.createDataFrame(warehouse_inventory_list, schema = warehouse_schema)
dyf_warehouse = DynamicFrame.fromDF(df_warehouse, glueContext, "dyf_warehouse")

dyf_warehouse.printSchema()

# Output
root
|-- warehouse_location: string
|-- data: string

Unbox

We use Unbox to extract JSON from String format for the new data. Compare the preceding printSchema() output with the following code:

# Input
dyf_unbox = Unbox.apply(frame = dyf_warehouse, path = "data", format="json")
dyf_unbox.printSchema()
# Output
root
|-- warehouse_loc: string
|-- data: struct
|    |-- strawberry: int
|    |-- pineapple: int
|    |-- mango: int
|    |-- pears: null

# Input 
dyf_unbox.toDF().show()

# Output
+-------------+----------------+
|warehouse_loc|            data|
+-------------+----------------+
| TX_WAREHOUSE|[220, 560, 350,]|
| CA_WAREHOUSE|  [34, 123, 42,]|
| CO_WAREHOUSE|  [340, 180, 2,]|
+-------------+----------------+

Unnest

Unnest allows us to flatten a single DynamicFrame to a more relational table format. We apply Unnest to the nested structure from the previous operation and flatten it:

# Input
dyf_unnest = UnnestFrame.apply(frame = dyf_unbox)

dyf_unnest.printSchema()

# Output 
root
|-- warehouse_loc: string
|-- data.strawberry: int
|-- data.pineapple: int
|-- data.mango: int
|-- data.pears: null

dyf_unnest.toDF().show()

# Output
+-------------+---------------+--------------+----------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|data.pears|
+-------------+---------------+--------------+----------+----------+
| TX_WAREHOUSE|            220|           560|       350|      null|
| CA_WAREHOUSE|             34|           123|        42|      null|
| CO_WAREHOUSE|            340|           180|         2|      null|
+-------------+---------------+--------------+----------+----------+

DropNullFields

The DropNullFields function makes it easy to drop columns with all null values. Our warehouse data indicated that it was out of pears and can be dropped. We apply the DropNullFields function on the DynamicFrame, which automatically identifies the columns with null values and drops them:

# Input
dyf_dropNullfields = DropNullFields.apply(frame = dyf_unnest)

dyf_dropNullfields.toDF().show()

# Output
+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| TX_WAREHOUSE|            220|           560|       350|
| CA_WAREHOUSE|             34|           123|        42|
| CO_WAREHOUSE|            340|           180|         2|
+-------------+---------------+--------------+----------+

SplitFields

SplitFields allows us to split a DyanmicFrame into two. The function takes the field names of the first DynamicFrame that we want to generate followed by the names of the two DynamicFrames:

# Input
dyf_splitFields = SplitFields.apply(frame = dyf_dropNullfields, paths = ["`data.strawberry`", "`data.pineapple`"], name1 = "a", name2 = "b")

For the first DynamicFrame, see the following code:

# Input
dyf_retrieve_a = SelectFromCollection.apply(dyf_splitFields, "a")
dyf_retrieve_a.toDF().show()

# Output
+---------------+--------------+
|data.strawberry|data.pineapple|
+---------------+--------------+
|            220|           560|
|             34|           123|
|            340|           180|
+---------------+--------------+

For the second Dynamic Frame, see the following code:

# Input
dyf_retrieve_b = SelectFromCollection.apply(dyf_splitFields, "b")
dyf_retrieve_b.toDF().show()

# Output
+-------------+----------+
|warehouse_loc|data.mango|
+-------------+----------+
| TX_WAREHOUSE|       350|
| CA_WAREHOUSE|        42|
| CO_WAREHOUSE|         2|
+-------------+----------+

SplitRows

SplitRows allows us to filter our dataset within a specific range of counts and split them into two DynamicFrames:

# Input
dyf_splitRows = SplitRows.apply(frame = dyf_dropNullfields, comparison_dict = {"`data.pineapple`": {">": "100", "<": "200"}}, name1 = 'pa_200_less', name2 = 'pa_200_more')

For the first Dynamic Frame, see the following code:

# Input
dyf_pa_200_less = SelectFromCollection.apply(dyf_splitRows, 'pa_200_less')
dyf_pa_200_less.toDF().show()

# Output
+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| CA_WAREHOUSE|             34|           123|        42|
| CO_WAREHOUSE|            340|           180|         2|
+-------------+---------------+--------------+----------+

For the second Dynamic Frame, see the following code:

# Input
dyf_pa_200_more = SelectFromCollection.apply(dyf_splitRows, 'pa_200_more')
dyf_pa_200_more.toDF().show()

# Output
+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| TX_WAREHOUSE|            220|           560|       350|
+-------------+---------------+--------------+----------+

Spigot

Spigot allows you to write a sample dataset to a destination during transformation. For our use case, we write the top 10 records locally:

# Input
dyf_splitFields = Spigot.apply(dyf_pa_200_less, '/home/glue/GlueLocalOutput/Spigot/', 'top10')

Depending on your local environment configuration, Spigot may run into errors. Alternatively, you can use an AWS Glue endpoint or an AWS Glue ETL job to run this function.

Write Dynamic Frame

The write_dynamic_frame function writes a DynamicFrame using the specified connection and format. For our use case, we write locally (we use a connection_type of S3 with a POSIX path argument in connection_options, which allows writing to local storage):

# Input
glueContext.write_dynamic_frame.from_options(\
frame = dyf_splitFields,\
connection_options = {'path': '/home/glue/GlueLocalOutput/'},\
connection_type = 's3',\
format = 'json')

Conclusion

This article discussed the PySpark ETL capabilities of AWS Glue. Further testing with an AWS Glue development endpoint or directly adding jobs in AWS Glue is a good pivot to take the learning forward. For more information, see General Information about Programming AWS Glue ETL Scripts.


About the Authors

Adnan Alvee is a Big Data Architect for AWS ProServe Remote Consulting Services. He helps build solutions for customers leveraging their data and AWS services. Outside of AWS, he enjoys playing badminton and drinking chai.

 

 

Imtiaz (Taz) Sayed is the World Wide Tech Leader for Data Analytics at AWS. He is an ardent data engineer and relishes connecting with the data analytics community.

 

How Our Paths Brought Us to Data and Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/how-our-paths-brought-us-to-data-and-netflix-4eced44a6872

Part of our series on who works in Analytics at Netflix — and what the role entails

by Julie Beckley & Chris Pham

This Q&A provides insights into the diverse set of skills, projects, and culture within Data Science and Engineering (DSE) at Netflix through the eyes of two team members: Chris Pham and Julie Beckley.

Photo from a team curling offsite — There’s us to the right!

[Chris] Julie and I joined the Streaming DSE team at Netflix a few years ago and have been close colleagues and friends since then. At work, we regularly lean on each other for help based on our respective areas of expertise — I bring my breadth of big data tools and technologies while Julie has been building statistical models for the past decade. Outside of work, we share a love of good food and coffee, exchanging tips on making espresso.

1. What was your path to working in data?

[Julie] I took a traditional path to data science. Since mathematics was my favorite subject in school, I decided to pursue it for my bachelors degree at McGill University (while indulging in French culture in the beautiful city of Montreal). Over the course of the four years it became clear that I enjoyed combining analytical skills with solving real world problems, so a PhD in Statistics was a natural next step. After completing my education, I was still not certain whether I wanted a job in academia or industry. I took a role as a Research Staff Member at IBM Research, which served as a middle ground with a joint focus on real world applications, academic research, and even allowed me to teach a graduate Machine Learning course! I then transitioned to a full industry role at Netflix.

[Chris] I initially wanted to build a career in consulting after receiving my graduate degree in Economics because I had a passion for analytical problem solving and statistical modeling. A role in data science eventually seemed like a natural transition, but it wasn’t without its hurdles: With my consulting background, I had to go through a few other roles first while learning how to code on the side. A lot of my learning and training was self-guided until 2016, when a manager at my last company took a chance on me and helped me make the rare transfer from a role in HR to Data Science.

2. Tell me about some of the exciting projects you’re a part of.

[Julie] Chris and I have the same primary stakeholders (or engineering team that we support): Encoding Technologies. They are continuously innovating compression algorithms to efficiently send high quality audio and video files to our customers over the internet. I focus on improving experimentation methodology to test how well the newest files are working: do they need less bits to stream while providing a higher video quality? Do they cause less errors? My work is typically developed in R or Python. I love the cross-functional nature of my work, as it allows me to learn from others and creatively explore new statistical methodologies to improve the Netflix service.

[Chris] When I first started working with Encoding Technologies, there was so much data waiting to be translated into actionable insights. It was fun starting from almost nothing and transforming all of that data into self-serve tools and dashboards for the team to understand their contribution to the Netflix streaming experience. These projects have involved using Spark, Python, SQL, Tableau, and Jupyter notebooks. Over the last year, I’ve spent a lot of time analyzing data to inform how we roll out new encoding innovations to the diverse ecosystem of devices that stream Netflix.

3. How do your projects impact the business at Netflix?

[Julie] Encoding experimentation (and more broadly, streaming experimentation) is critical for ensuring our customers have a good Quality of Experience when watching Netflix. In other words, the content you’re about to watch needs to load quickly with high video quality. When we test new encodes, we need effective data science methods to quickly and accurately understand whether customers are having a better experience. With these insights, the engineering teams can quickly understand what’s working well and what needs to be improved. It’s super exciting to see the impact of my work when I hear from friends and family that Netflix is streaming well for them!

[Chris] There’s a lot of things to consider when we roll out a new compression algorithm. Which devices get this treatment? What is the benefit to the streaming experience? Is the benefit uniform, or do certain cohorts of members — such as those who stream over a cellular connection — benefit more? How does a decision of this scale affect the efficiency of our globally distributed content delivery network, Open Connect? It’s one big optimization problem that requires balancing several different factors. Streaming DSE is at the center of it all, bringing together different teams at Netflix and using data to drive decisions that impact our members around the world.

4. What does it take to succeed at Netflix in a data role?

[Julie] One of the special things about working at Netflix is that a diverse set of skills and backgrounds is truly appreciated, since there are many ways to add value to the company. From my experience, being proactive in pushing forward on your ideas is key. The values in the Netflix culture document allow for a framework where everyone is a leader to work well — this is because we expect initiative, direct and candid feedback, and transparency in everything we do. This leads to a great environment where I am constantly challenged, learning, and receiving constructive feedback on how I can do better!

[Chris] I think a big part of our jobs is continuously thinking about how data can benefit our stakeholders. Julie and I will never know as much about video and audio compression algorithms as our talented Encoding Technologies team, but we should be the ones most familiar with the data: How to access, analyze, and visualize it; how to transform it into metrics that act as strong and accurate proxies for a member’s experience; and how to guide others to draw the right conclusions from data so they can act on it. Writing memos is a big part of Netflix culture, which I’ve found has been helpful for sharing ideas, soliciting feedback, and documenting project details. So writing well, especially the ability to translate technical concepts for a non-technical audience, is also very useful.

5. What piece of advice would you pass along to those just starting out their career in data?

[Julie] One piece of advice I would pass along (and wish I could give to my younger self) is not to stress and try to plan every step of your data science career. Your career is long (and unpredictable!), so as long as you work hard and stay motivated, it will move in an exciting direction.

[Chris] Everyone wants to build fancy models or tools, but fewer are willing to do the foundational things like cleaning the data and writing the documentation. I’ve found that volunteering and being proactive (no matter the task) has been an effective way of building trust with others, and it opened my career up to many more opportunities early on.

If this post resonates with you and you’d like to explore opportunities with Netflix, check out our analytics site, search open roles, and learn about our culture. You can also find more stories like this here.


How Our Paths Brought Us to Data and Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Analytics at Netflix: Who we are and what we do

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/analytics-at-netflix-who-we-are-and-what-we-do-7d9c08fe6965

Analytics at Netflix: Who We Are and What We Do

An Introduction to Analytics and Visualization Engineering at Netflix

by Molly Jackman & Meghana Reddy

Explained: Season 1 (Photo Credit: Netflix)

Across nearly every industry, there is recognition that data analytics is key to driving informed business decision-making. But there is far less agreement on what that term “data analytics” actually means — or what to call the people responsible for the work.

Even within Netflix, we have many groups that do some form of data analysis, including business strategy and consumer insights. But here we are talking about Netflix’s Data Science and Engineering group, which specializes in analytics at scale. The group has technical, engineering-oriented roles that fall under two broad category titles: “Analytics Engineers” and “Visualization Engineers.” In this post, we refer to these two titles collectively as the “analytics role.” These professionals come from a wide range of backgrounds and bring different skills to their work, while sharing a common drive to generate and scale business impact through data.

Individuals in these roles possess deep business context and are thought leaders alongside their business counterparts. This enables them to fully understand where their partners are coming from.

What’s the purpose of the analytics role at Netflix?

When you think about data at Netflix, what comes to mind? Oftentimes it is our content recommendation algorithm or the online delivery of video to your device at home. Both are integral parts of the business, but far from the whole picture. Data is used to inform a wide range of questions — ‘How can we make the product experience even better?’, ‘Which shows and films bring the most joy to our members?’, ‘Who can we partner with to expand access to our service in new markets?’. Our Analytics and Visualization Engineers are taking on these and other big questions for the company, informing decision-making across every corner of the business.

We align our analytic teams with business area verticals
We align our analytic teams with business area verticals

Since the problem space is so varied, we align our analytics professionals with the listed business area verticals rather than organizing them within a single functional horizontal. The expectation is that individuals in these roles possess deep business context and are thought leaders alongside their business counterparts. This enables them to fully understand where their partners are coming from. It also means Analytics and Visualization Engineers are a specialized resource and a rare commodity. There are many more questions and stakeholders than analytics team members, and the job is not to take on every request. Instead, these individual contributors are given freedom to choose their projects and are responsible for prioritizing the ones that will have the most business impact (and deprioritizing the rest). This requires a lot of judgment and embodies our “context not control” culture.

“OK, but what do they actually do…?”

What does the job entail?

You’ve probably caught on to some common themes: People in the analytics role are highly connected to the business, solve end-to-end problems, and are directly responsible for improving business outcomes. But what makes this group really shine are their differences. They come from lots of backgrounds, which yields different perspectives on how to approach problems. We use the catch-all titles of Analytics and Visualization Engineers so as to not get too hung up on specific credentials. Instead, people are empowered to leverage their unique skills to make Netflix better.

A couple other defining characteristics of the role are full ownership of the problem (in Netflix lingo, you are the “informed captain” of your space) and creating trustworthy outputs. These are only possible through the one-two punch of deep business context 👊 and technical excellence 👊. Full ownership often means building new data pipelines, navigating complex schemas and large data sets, developing or improving metrics for business performance, and creating intuitive visualizations and dashboards — always with an eye towards actionable insights.

We use the catch-all titles of Analytics and Visualization Engineers so as to not get too hung up on specific credentials. Instead, people are empowered to leverage their unique skills to make Netflix better.

Because these professionals vary in their expertise, so too does their day-to-day. Below are three broadly defined personas to help illustrate some of the different backgrounds, motivations, and activities of individuals in the analytics role at Netflix. Many of our colleagues have come in with expertise that spans multiple personas. Others have grown into new areas as part of their professional development at Netflix. Ultimately, these skills are all on a continuum, some broad and some deep, and these are just a few examples of such expertise. So if you find yourself connecting with any part of these descriptions, the analytics role could be for you.

  • The Analyst is motivated by delivering metrics, findings, or dashboards that drive analytical insights and business decisions. They love to communicate their discoveries to nontechnical audiences, explain caveats, and debate analytic choices and strategic implications with peers and stakeholders. Their expertise is descriptive analytic methodology, but they have the necessary tools to be scrappy (e.g. coding, math, stats), and do what’s required to answer the highest priority business questions.
  • The Engineer enjoys making data available by piping it in from new sources in optimal ways, building robust data models, prototyping systems, and doing project-specific engineering. They’re still analysts at heart but, similar to data engineers, they have a deep understanding of data warehouse capabilities and are pros at data processing optimization and performance tuning. Being at this intersection of disciplines allows them to produce full-stack outputs, layering visualizations and analytics on their projects.
  • The Visualizer is passionate about the scalability, beauty, and functionality of dashboards and their capability for telling a visual story. They also have an eye for principled engineering, i.e. managing the data under the surface. They want to pick the perfect chart type for the narrative while also focusing on delivering key analytic insights. They may use industry tools (e.g. Tableau, Looker, Power BI) to their fullest extent, developing a deeper understanding of analytics by examining these tools under the hood. Or they may create sophisticated visuals from scratch and build the type of custom UI that enterprise tools don’t offer (e.g. JavaScript web apps).

Introducing Analytics at Netflix

Whether you’re a data professional, student, or Netflix enthusiast, we invite you to meet our stunning colleagues and hear their stories. If this series resonates with you and you’d like to explore opportunities with us, check out our analytics site, search open roles, and learn about our culture.

Welcome to Analytics at Netflix!

Related Posts:


Analytics at Netflix: Who we are and what we do was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

How to delete user data in an AWS data lake

Post Syndicated from George Komninos original https://aws.amazon.com/blogs/big-data/how-to-delete-user-data-in-an-aws-data-lake/

General Data Protection Regulation (GDPR) is an important aspect of today’s technology world, and processing data in compliance with GDPR is a necessity for those who implement solutions within the AWS public cloud. One article of GDPR is the “right to erasure” or “right to be forgotten” which may require you to implement a solution to delete specific users’ personal data.

In the context of the AWS big data and analytics ecosystem, every architecture, regardless of the problem it targets, uses Amazon Simple Storage Service (Amazon S3) as the core storage service. Despite its versatility and feature completeness, Amazon S3 doesn’t come with an out-of-the-box way to map a user identifier to S3 keys of objects that contain user’s data.

This post walks you through a framework that helps you purge individual user data within your organization’s AWS hosted data lake, and an analytics solution that uses different AWS storage layers, along with sample code targeting Amazon S3.

Reference architecture

To address the challenge of implementing a data purge framework, we reduced the problem to the straightforward use case of deleting a user’s data from a platform that uses AWS for its data pipeline. The following diagram illustrates this use case.

We’re introducing the idea of building and maintaining an index metastore that keeps track of the location of each user’s records and allows us locate to them efficiently, reducing the search space.

You can use the following architecture diagram to delete a specific user’s data within your organization’s AWS data lake.

For this initial version, we created three user flows that map each task to a fitting AWS service:

Flow 1: Real-time metastore update

The S3 ObjectCreated or ObjectDelete events trigger an AWS Lambda function that parses the object and performs an add/update/delete operation to keep the metadata index up to date. You can implement a simple workflow for any other storage layer, such as Amazon Relational Database Service (RDS), Amazon Aurora, or Amazon Elasticsearch Service (ES). We use Amazon DynamoDB and Amazon RDS for PostgreSQL as the index metadata storage options, but our approach is flexible to any other technology.

Flow 2: Purge data

When a user asks for their data to be deleted, we trigger an AWS Step Functions state machine through Amazon CloudWatch to orchestrate the workflow. Its first step triggers a Lambda function that queries the metadata index to identify the storage layers that contain user records and generates a report that’s saved to an S3 report bucket. A Step Functions activity is created and picked up by a Lambda Node JS based worker that sends an email to the approver through Amazon Simple Email Service (SES) with approve and reject links.

The following diagram shows a graphical representation of the Step Function state machine as seen on the AWS Management Console.

The approver selects one of the two links, which then calls an Amazon API Gateway endpoint that invokes Step Functions to resume the workflow. If you choose the approve link, Step Functions triggers a Lambda function that takes the report stored in the bucket as input, deletes the objects or records from the storage layer, and updates the index metastore. When the purging job is complete, Amazon Simple Notification Service (SNS) sends a success or fail email to the user.

The following diagram represents the Step Functions flow on the console if the purge flow completed successfully.

For the complete code base, see step-function-definition.json in the GitHub repo.

Flow 3: Batch metastore update

This flow refers to the use case of an existing data lake for which index metastore needs to be created. You can orchestrate the flow through AWS Step Functions, which takes historical data as input and updates metastore through a batch job. Our current implementation doesn’t include a sample script for this user flow.

Our framework

We now walk you through the two use cases we followed for our implementation:

  • You have multiple user records stored in each Amazon S3 file
  • A user has records stored in homogenous AWS storage layers

Within these two approaches, we demonstrate alternatives that you can use to store your index metastore.

Indexing by S3 URI and row number

For this use case, we use a free tier RDS Postgres instance to store our index. We created a simple table with the following code:

CREATE UNLOGGED TABLE IF NOT EXISTS user_objects (
				userid TEXT,
				s3path TEXT,
				recordline INTEGER
			);

You can index on user_id to optimize query performance. On object upload, for each row, you need to insert into the user_objects table a row that indicates the user ID, the URI of the target Amazon S3 object, and the row that corresponds to the record. For instance, when uploading the following JSON input, enter the following code:

{"user_id":"V34qejxNsCbcgD8C0HVk-Q","body":"…"}
{"user_id":"ofKDkJKXSKZXu5xJNGiiBQ","body":"…"}
{"user_id":"UgMW8bLE0QMJDCkQ1Ax5Mg","body ":"…"}

We insert the tuples into user_objects in the Amazon S3 location s3://gdpr-demo/year=2018/month=2/day=26/input.json. See the following code:

(“V34qejxNsCbcgD8C0HVk-Q”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 0)
(“ofKDkJKXSKZXu5xJNGiiBQ”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 1)
(“UgMW8bLE0QMJDCkQ1Ax5Mg”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 2)

You can implement the index update operation by using a Lambda function triggered on any Amazon S3 ObjectCreated event.

When we get a delete request from a user, we need to query our index to get some information about where we have stored the data to delete. See the following code:

SELECT s3path,
                ARRAY_AGG(recordline)
                FROM user_objects
                WHERE userid = ‘V34qejxNsCbcgD8C0HVk-Q’
                GROUP BY;

The preceding example SQL query returns rows like the following:

(“s3://gdpr-review/year=2015/month=12/day=21/review-part-0.json“, {2102,529})

The output indicates that lines 529 and 2102 of S3 object s3://gdpr-review/year=2015/month=12/day=21/review-part-0.json contain the requested user’s data and need to be purged. We then need to download the object, remove those rows, and overwrite the object. For a Python implementation of the Lambda function that implements this functionality, see deleteUserRecords.py in the GitHub repo.

Having the record line available allows you to perform the deletion efficiently in byte format. For implementation simplicity, we purge the rows by replacing the deleted rows with an empty JSON object. You pay a slight storage overhead, but you don’t need to update subsequent row metadata in your index, which would be costly. To eliminate empty JSON objects, we can implement an offline vacuum and index update process.

Indexing by file name and grouping by index key

For this use case, we created a DynamoDB table to store our index. We chose DynamoDB because of its ease of use and scalability; you can use its on-demand pricing model so you don’t need to guess how many capacity units you might need. When files are uploaded to the data lake, a Lambda function parses the file name (for example, 1001-.csv) to identify the user identifier and populates the DynamoDB metadata table. Userid is the partition key, and each different storage layer has its own attribute. For example, if user 1001 had data in Amazon S3 and Amazon RDS, their records look like the following code:

{"userid:": 1001, "s3":{"s3://path1", "s3://path2"}, "RDS":{"db1.table1.column1"}}

For a sample Python implementation of this functionality, see update-dynamo-metadata.py in the GitHub repo.

On delete request, we query the metastore table, which is DynamoDB, and generate a purge report that contains details on what storage layers contain user records, and storage layer specifics that can speed up locating the records. We store the purge report to Amazon S3. For a sample Lambda function that implements this logic, see generate-purge-report.py in the GitHub repo.

After the purging is approved, we use the report as input to delete the required resources. For a sample Lambda function implementation, see gdpr-purge-data.py in the GitHub repo.

Implementation and technology alternatives

We explored and evaluated multiple implementation options, all of which present tradeoffs, such as implementation simplicity, efficiency, critical data compliance, and feature completeness:

  • Scan every record of the data file to create an index – Whenever a file is uploaded, we iterate through its records and generate tuples (userid, s3Uri, row_number) that are then inserted to our metadata storing layer. On delete request, we fetch the metadata records for requested user IDs, download the corresponding S3 objects, perform the delete in place, and re-upload the updated objects, overwriting the existing object. This is the most flexible approach because it supports a single object to store multiple users’ data, which is a very common practice. The flexibility comes at a cost because it requires downloading and re-uploading the object, which introduces a network bottleneck in delete operations. User activity datasets such as customer product reviews are a good fit for this approach, because it’s unexpected to have multiple records for the same user within each partition (such as a date partition), and it’s preferable to combine multiple users’ activity in a single file. It’s similar to what was described in the section “Indexing by S3 URI and row number” and sample code is available in the GitHub repo.
  • Store metadata as file name prefix – Adding the user ID as the prefix of the uploaded object under the different partitions that are defined based on query pattern enables you to reduce the required search operations on delete request. The metadata handling utility finds the user ID from the file name and maintains the index accordingly. This approach is efficient in locating the resources to purge but assumes a single user per object, and requires you to store user IDs within the filename, which might require InfoSec considerations. Clickstream data, where you would expect to have multiple click events for a single customer on a single date partition during a session, is a good fit. We covered this approach in the section “Indexing by file name and grouping by index key” and you can download the codebase from the GitHub repo.
  • Use a metadata file – Along with uploading a new object, we also upload a metadata file that’s picked up by an indexing utility to create and maintain the index up to date. On delete request, we query the index, which points us to the records to purge. A good fit for this approach is a use case that already involves uploading a metadata file whenever a new object is uploaded, such as uploading multimedia data, along with their metadata. Otherwise, uploading a metadata file on every object upload might introduce too much of an overhead.
  • Use the tagging feature of AWS services – Whenever a new file is uploaded to Amazon S3, we use the Put Object Tagging Amazon S3 operation to add a key-value pair for the user identifier. Whenever there is a user data delete request, it fetches objects with that tag and deletes them. This option is straightforward to implement using the existing Amazon S3 API and can therefore be a very initial version of your implementation. However, it involves significant limitations. It assumes a 1:1 cardinality between Amazon S3 objects and users (each object only contains data for a single user), searching objects based on a tag is limited and inefficient, and storing user identifiers as tags might not be compliant with your organization’s InfoSec policy.
  • Use Apache Hudi – Apache Hudi is becoming a very popular option to perform record-level data deletion on Amazon S3. Its current version is restricted to Amazon EMR, and you can use it if you start to build your data lake from scratch, because you need to store your as Hudi datasets. Hudi is a very active project and additional features and integrations with more AWS services are expected.

The key implementation decision of our approach is separating the storage layer we use for our data and the one we use for our metadata. As a result, our design is versatile and can be plugged in any existing data pipeline. Similar to deciding what storage layer to use for your data, there are many factors to consider when deciding how to store your index:

  • Concurrency of requests – If you don’t expect too many simultaneous inserts, even something as simple as Amazon S3 could be a starting point for your index. However, if you get multiple concurrent writes for multiple users, you need to look into a service that copes better with transactions.
  • Existing team knowledge and infrastructure – In this post, we demonstrated using DynamoDB and RDS Postgres for storing and querying the metadata index. If your team has no experience with either of those but are comfortable with Amazon ES, Amazon DocumentDB (with MongoDB compatibility), or any other storage layer, use those. Furthermore, if you’re already running (and paying for) a MySQL database that’s not used to capacity, you could use that for your index for no additional cost.
  • Size of index – The volume of your metadata is orders of magnitude lower than your actual data. However, if your dataset grows significantly, you might need to consider going for a scalable, distributed storage solution rather than, for instance, a relational database management system.

Conclusion

GDPR has transformed best practices and introduced several extra technical challenges in designing and implementing a data lake. The reference architecture and scripts in this post may help you delete data in a manner that’s compliant with GDPR.

Let us know your feedback in the comments and how you implemented this solution in your organization, so that others can learn from it.

 


About the Authors

George Komninos is a Data Lab Solutions Architect at AWS. He helps customers convert their ideas to a production-ready data product. Before AWS, he spent 3 years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.

 

 

 

 

Sakti Mishra is a Data Lab Solutions Architect at AWS. He helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives. Outside of work, Sakti enjoys learning new technologies, watching movies, and travel.

Streaming data from Amazon S3 to Amazon Kinesis Data Streams using AWS DMS

Post Syndicated from Mahesh Goyal original https://aws.amazon.com/blogs/big-data/streaming-data-from-amazon-s3-to-amazon-kinesis-data-streams-using-aws-dms/

Stream processing is very useful in use cases where we need to detect a problem quickly and improve the outcome based on data, for example production line monitoring or supply chain optimizations.

This blog post walks you through process of streaming existing data files and ongoing changes from Amazon Simple Storage Service (Amazon S3) to Amazon Kinesis. You achieve this by using AWS Database Migration Service (AWS DMS). AWS DMS enables you to seamlessly migrate data from supported sources to relational databases, data warehouses, streaming platforms, and other data stores in AWS cloud.

Many SaaS, third-party applications already integrate with Amazon S3 and can deliver records to S3 buckets. In certain use cases, you need to further process this data in near-real-time to generate alerts. Use cases like threat detection and application monitoring require generating insights in seconds. Waiting for batch processes often leads to a delay in data analysis and reduces the ability of systems to respond quickly to critical situations. For such use cases, you need a way to convert batch to stream processing by expanding the existing integrations of your applications with Amazon S3.

You can use AWS DMS for such data-processing requirements. AWS DMS lets to expand your existing application into Amazon S3 to produce data in Amazon Kinesis Data Streams for real-time analytics without writing and maintaining new code. AWS DMS supports specifying Amazon S3 as the source and streaming services like Kinesis and Amazon Managed Streaming of Kafka (Amazon MSK) as the target. AWS DMS allows migration of full and change data capture (CDC) files to these services. AWS DMS performs this task out of box without any complex configuration or code development. You can also configure an AWS DMS replication instance to scale up or down depending on the workload.

For this post, we focus on streaming data to Kinesis. We deploy an AWS CloudFormation template to get started in minutes and explore the streaming pipeline.

Architecture overview

Third-party applications such as web, API, and data-integration services produce data and log files in S3 buckets. Data lakes built on AWS process and store data in Amazon S3 at different stages. AWS DMS supports Amazon S3 as the source and Kinesis as the target, so data stored in an S3 bucket is streamed to Kinesis. Several consumers, such as AWS Lambda, Amazon Kinesis Data Firehose, Amazon Kinesis Data Analytics, and the Kinesis Consumer Library (KCL), can consume the data concurrently to perform real-time analytics on the dataset. Each AWS service in this architecture can scale independently as needed.

The following diagram shows the architecture of this solution.

Deploying AWS CloudFormation

To get started, you first deploy the CloudFormation template to create the core components of the architecture. AWS CloudFormation automates the deployment of technology and infrastructure in a safe and repeatable manner across multiple Regions and accounts with the least amount of effort and time. To create these resources, complete the following steps:

  1. Sign in to the AWS Management Console and choose the us-west-2 Region.
  2. Choose Launch Stack:
  3. Choose Next.

 This automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the CloudFormation template on the console.

  1. For Stack name, enter a stack name.
  2. On the next screen, choose your VPC and subnet IDs.
  3. For Does DMS VPC and Cloudwatch role Exists?, enter Y if the managed AWS Identity and Access Management (IAM) roles dms-vpc-role and dms-cloudwatch-logs-role exist in your account. Otherwise, leave at the default N.

If you want to deploy the AWS DMS endpoint in a private subnet, enable the VPC endpoints for Kinesis and Amazon S3 before deploying the template.

  1. Choose Next.
  2. Acknowledge resource creation under Capabilities on the final screen and choose Create.

The stack takes 5–10 minutes to complete, during which it performs the following:

  • Creates a source S3 bucket and target Kinesis data stream with two shards.
  • Creates an AWS DMS replication instance, Amazon S3 source endpoint, and Kinesis target.
  • Maps the S3 bucket and data steam to their respective endpoints.
  • Configures a replication task with the required parameters.
  • Creates an AWS Lambda function with a trigger to consume records from Kinesis. For more information, see Using AWS Lambda with Amazon Kinesis.

The files required for this demo don’t come with the template. Download blog_sample_file.zip and upload it to the source bucket before starting the AWS DMS task.

Using Amazon S3 as the source

When you use Amazon S3 as the source, the data files (full load and CDC) must be in comma-separated value (CSV) format.

In addition to the data files, AWS DMS also requires an external table definition. An external table definition is a JSON document that describes how AWS DMS should interpret the data from Amazon S3.

Amazon S3 file paths for full load and CDC files are required for AWS DMS to run the task. Make sure that files names are sequentially numbered to replicate the data in the correct order. In addition, AWS DMS allows you to specify the column delimiter, row delimiter, and other parameters using extra connection attributes.

AWS DMS can identify the operation to perform for each load record in two ways: from the record’s keyword value INSERT or I.

For more information, see Using Amazon S3 as a source for AWS DMS.

Using Amazon Kinesis as the target

AWS publishes records to a Kinesis data stream as JSON. During conversion, AWS DMS serializes each record from the source Amazon S3 files into an attribute-value pair in JSON format.

AWS DMS publishes each record in the source Amazon S3 file as one JSON data record in a data stream regardless of the action specified in the source file.

Additionally, AWS DMS allows object mapping to migrate data from source files to a data stream. Object mapping determines the structure of data records in the stream.

AWS DMS also supports multi-threaded migration for full load and CDC with task settings. You can promote the performance by setting multiple threads, buffer size, and parallel queue.

For more information, see Using Amazon Kinesis Data Streams as a target for AWS Database Migration Service.

Walkthrough

The AWS CloudFormation deployment takes care of all the infrastructure. Now you need files to complete this use case.

  1. Download blog_sample_file.zip, which contains full and CDC load files in CSV format.

If your source files aren’t in CSV, convert the file format to CSV. One conversion method is by using AWS Glue. For more information, see Format Options for ETL Inputs and Outputs in AWS Glue.

The following screenshot shows the sample records of the full load files that you use for this use case.

CDC files require additional attributes for AWS DMS to identify the action, table, and schema.

  1. Reformat the files as follows:
  • Operation – The change operation to be performed: INSERT or I, UPDATE or U, or DELETE or D.
  • Table name – The name of the source table.
  • Schema name – The name of the source schema.
  • Data – One or more columns that represent the data to be changed.

The following screenshot shows sample records of the CDC file.

External table definition is required in the source endpoint configuration. For this post, the definition is embedded in AWS CloudFormation.

  1. Enter the following code for the table definition for the full and CDC files:
    {
    	“TableCount”: “1",
    	“Tables”: [{
    		“TableName”: “table01”,
    		“TablePath”: “schema01/table01/“,
    		“TableOwner”: “schema01",
    		“TableColumns”: [{
    			“ColumnName”: “ingest_time”,
    			“ColumnType”: “TIMESTAMP”,
    			“ColumnNullable”: “false”,
    			“ColumnIsPk”: “true”
    		}, {
    			“ColumnName”: “doi”,
    			“ColumnType”: “STRING”,
    			“ColumnLength”: “30”
    		}, {
    			“ColumnName”: “id”,
    			“ColumnType”: “INT8”
    		}, {
    			“ColumnName”: “value”,
    			“ColumnType”: “NUMERIC”,
    			“ColumnPrecision”: “5”,
    			“ColumnScale”: “2”
    		}, {
    			“ColumnName”: “data_sig”,
    			“ColumnType”: “STRING”,
    			“ColumnLength”: “10”
    		}],
    		“TableColumnsTotal”: “5”
    	}]
    }
    

  2. Create folder structures under the source S3 bucket created through the CloudFormation template.
    1. Create folders schema01/table01/ for full load and cdcfile/ for CDC data files.
    2. Also, file names should be in incremental, as listed in the following CLI output.
      $aws s3 ls s3://blog-xxxxxxxx/schema01/table01 --recursive --human-readable --summarize
      2020-08-03 22:05:57    5.0 MiB schema01/table01/full_000
      2020-08-03 22:05:51    5.0 MiB schema01/table01/full_001
      2020-08-03 22:06:00    5.0 MiB schema01/table01/full_002
      2020-08-03 22:05:56    5.0 MiB schema01/table01/full_003
      2020-08-03 22:05:59    3.1 MiB schema01/table01/full_004
      
      $aws s3 ls s3://blog-xxxxxxxx/cdcfile --recursive --human-readable --summarize
      2020-08-03 22:06:28    4.8 MiB cdc/cdc_000
      2020-08-03 22:06:28    4.8 MiB cdc/cdc_001
      2020-08-03 22:06:26    4.8 MiB cdc/cdc_002
      2020-08-03 22:06:19    4.8 MiB cdc/cdc_003
      

  3. After the files are copied, on the AWS DMS console, choose Replication.
  4. Validate the instance status and configuration.
  5. Choose Endpoints.
  6. Validate the status and configuration of the Amazon S3 source endpoint and make sure that the connection to the replication instance is successful.
  7. Similarly, validate the status and configuration of Kinesis target endpoint and make sure that the connection to the replication instance is successful.
  8. Choose Database migration task.
  9. Verify that the source and target are mapped correctly.
  10. After validating all the configurations, restart the AWS DMS task. Because the task has been created and never started, choose Restart/Resume to start full load and CDC.

After data migration starts, you can see it listed under Table statistics. For more information, see How do I use table statistics to monitor an AWS DMS task?

AWS DMS completes the full load first and migrates change data as files are uploaded to the bucket location specified in the cdcPath parameter.

  1. While the migration is in progress, on the Kinesis console, check the IncomingBytes metrics on the Monitoring tab to confirm the data is streaming to Kinesis Data Streams.
  2. To confirm that the data streamed is being consumed by the Lambda consumer, use the GetRecords.Bytes metric.

You’re now ready to validate the records in Lambda. Lambda is configured to read from Kinesis through a trigger.

The Lambda consumer for this post is a sample function that consumes the records from the Kinesis data stream, decodes the base64 encoded data, and prints the records to the Amazon CloudWatch log group.

  1. On the Monitoring tab, open the recent logstream under CloudWatch Log Insights to see the printed records.

For more information about monitoring, see Monitoring functions in the AWS Lambda console.

You can add processing logic to the Lambda function as per your requirements to aggregate or process the records. You can also configure a Lambda destination for further processing. Lambda asynchronous invocations can put an event or message on Amazon Simple Notification Service (Amazon SNS), Amazon Simple Queue Service (Amazon SQS), or Amazon EventBridge. For more information, see Introducing AWS Lambda Destinations.

Best practice considerations

When implementing this solution, consider the following best practices:

  • Full load allows to you stream existing data from an S3 bucket to Kinesis. You can use full load to migrate previously stored data before streaming CDC data. The full load data should already exist before the task starts. For new CDC files, the data is streamed to Kinesis on a file delivery event in real-time.
  • For loading multiple tables, you can specify the table count and table properties in an external table definition file. The CDC path remains the same and AWS DMS maps the records to tables based on the metadata fields.
  • During a heavy workload, the AWS DMS instance can be constrained to resources like CPU, memory, storage, and I/O. For optimal transfer speed, monitor the CloudWatch metrics and scale the replication instance.
  • For migrating a large number of tables, you can speed up the transfer by setting the multi-threading parameter to higher values.
  • The CloudFormation template creates a data stream with two shards. As the data flow rate to the stream increases, you can scale the number of shards in the stream to adapt to changes. Monitoring Kinesis with CloudWatch metrics for IncomingRecords and WriteProvisionedThroughputExceeded provides insights on how to scale the shards.
  • Object mapping in the AWS DMS task defines the partition key. This partition key is used to group data by shard within a stream. The default partition key AWS DMS uses is TableName. You can use attribute mapping to change the partition key to a value of one of the fields in the JSON, or the primary key of the table in the source database. You can also set the partition key to a constant value to stream all the data to a single shard in the stream.
  • By default, Lambda invokes the function as soon as records are available in the stream. To avoid invoking the function with a small number of records, configure the event source to buffer records for up to 5 minutes by configuring a batch window. For more information, see Using AWS Lambda with Amazon Kinesis.
  • When Kinesis is configured as a trigger for Lambda, you can increase the concurrency to process multiple batches from each shard in parallel. Lambda can process up to 10 batches in each shard simultaneously. For more information about concurrency, see New AWS Lambda scaling controls for Kinesis and DynamoDB event sources.

Cleaning up

After successful testing and validation, you should delete all the resources deployed through the CloudFormation template to avoid any unwanted costs. First empty the S3 bucket and stop the AWS DMS task. Then delete the appropriate stacks on the AWS CloudFormation console.

Summary

This post describes a solution for converting batch processing to near real-time using AWS DMS. This solution greatly simplifies the process of migrating records from Amazon S3 to Kinesis for analysis. Kinesis as an AWS DMS target allows multiple systems to consume data simultaneously. Having a near-steaming pipeline allows you to make sense of all the changes in near-real time, which ultimately expands your organization’s ability for better decision-making. All the resources used in this solution scale seamlessly and allow you to focus on analysis, alerting, reporting, and fraud detection instead of focusing on platform setup and maintenance. This promotes cost-effectiveness while reducing operational burden.


About the Author

Mahesh Goyal is a Data Architect in Big Data at AWS. He works with customers in their journey to the cloud with a focus on big data and data warehouses. In his spare time, Mahesh likes to listen to music and explore new food places with his family.

 

 

 

 

Charishma Makineni is a Technical Account Manager at AWS. She works with enterprise customers to help them build secure and scalable solutions on the AWS cloud. She is focused on Big data and Analytics technologies. Outside of work, Charishma enjoys being outdoors, gardening and experimenting with cooking.

 

 

 

Suresh Patnam is a Solutions Architect at AWS. He helps customers innovate on the AWS platform by building highly available, scalable, and secure architectures on Big Data and AI/ML. In his spare time, Suresh enjoys playing tennis and spending time with his family.

Analyzing Amazon S3 server access logs using Amazon ES

Post Syndicated from Mahesh Goyal original https://aws.amazon.com/blogs/big-data/analyzing-amazon-s3-server-access-logs-using-amazon-es/

When you use Amazon Simple Storage Service (Amazon S3) to store corporate data and host websites, you need additional logging to monitor access to your data and the performance of your application. An effective logging solution enhances security and improves the detection of security incidents. With the advent of increased data storage needs, you can rely on Amazon S3 for a range of use cases and simultaneously looking for ways to analyze your logs to ensure compliance, perform the audit, and discover risks.

Amazon S3 lets you monitor the traffic using the server access logging feature. With server access logging, you can capture and monitor the traffic to your S3 bucket at any time, with detailed information about the source of the request. The logs are stored in the S3 bucket you own in the same Region. This addresses the security and compliance requirements of most organizations. The logs are critical for establishing baselines, analyzing access patterns, and identifying trends. For example, the logs could answer a financial organization’s question about how many requests are made to a bucket and who is making what type of access requests to the objects.

You can discover insights from server access logs through several different methods. One common option is by using Amazon Athena or Amazon Redshift Spectrum and query the log files stored in Amazon S3. However, this solution poses high latency with an exponential growth in volume. It requires further integration with Amazon QuickSight to add visualization capabilities.

You can address this by using Amazon Elasticsearch Service (Amazon ES). Amazon ES is a managed service that makes it easier to deploy, operate, and scale Elasticsearch clusters in the AWS Cloud. Elasticsearch is a popular open-source search and analytics engine for use cases such as log analytics, real-time application monitoring, and clickstream analysis. The service provides support for open-source Elasticsearch APIs, managed Kibana, and integration with other AWS services such as Amazon S3 and Amazon Kinesis for loading streaming data into Amazon ES.

This post walks you through automating ingestion of server access logs from Amazon S3 into Amazon ES using AWS Lambda and visualizing the data in Kibana.

Architecture overview

Server access logging is enabled on source buckets, and logs are delivered to access log bucket. The access log bucket is configured to send an event to the Lambda function when a log file is created. On an event trigger, the Lambda function reads the file, processes the access log, and sends it to Amazon ES. When the logs are available, you can use Kibana to create interactive visuals and analyze the logs over a time period.

When designing a log analytics solution for high-frequency incoming data, you should consider buffering layers to avoid instability in the system. Buffering helps you streamline processes for unpredictable incoming log data. For such use cases, you can take advantage of managed services like Amazon Kinesis Data Streams, Amazon Kinesis Data Firehose, and Amazon Managed Streaming for Apache Kafka (Amazon MSK).

Streaming services buffer data before delivering it to Amazon ES. This helps you avoid overwhelming your cluster with spiky ingestion events. Kinesis Data Firehose can reliably load data into Amazon ES. Kinesis Data Firehose lets you choose a buffer size of 1–100 MiBs and a buffer interval of 60–900 seconds when Amazon ES is selected as the destination. Kinesis Data Firehose also scales automatically to match the throughput of your data and requires no ongoing administration. For more information, see Ingest streaming data into Amazon Elasticsearch Service within the privacy of your VPC with Amazon Kinesis Data Firehose.

The following diagram illustrates the solution architecture.

Prerequisites

Before creating resources in AWS CloudFormation, you must enable server access logging on the source bucket. Open the S3 bucket properties and look for Amazon S3 access and delivery bucket. See the following screenshot.

You also need an AWS Identity and Access Management (IAM) user with sufficient permissions to interact with the AWS Management Console and related AWS services. The user must have access to create IAM roles and policies via the CloudFormation template.

Setting up the resources with AWS CloudFormation

First, deploy the CloudFormation template to create the core components of the architecture. AWS CloudFormation automates the deployment of technology and infrastructure in a safe and repeatable manner across multiple Regions and multiple accounts with the least amount of effort and time.

  1. Sign in to the console and choose the Region of the bucket storing the access log. For this post, I use us-east-1.
  2. Launch the stack:
  3. Choose Next.
  4. For Stack name, enter a name.
  5. On the Parameters page, enter the following parameters:
    1. VPC Configuration – Select any VPC that has at least two private subnets. The template deploys the Amazon ES service domain and Lambda within the VPC.
    2. Private subnets – Select two private subnets of the VPC. The route tables associated with subnets must have a NAT gateway configuration and VPC endpoint for Amazon S3 to privately connect the bucket from Lambda.
    3. Access log S3 bucket – Enter the S3 bucket where access logs are delivered. The template configures event notification on the bucket to trigger the Lambda function.
    4. Amazon ES domain name – Specify the Amazon ES domain name to be deployed through the template.
  6. Choose Next.
  7. On the next page, choose Next.
  8. Acknowledge resource creation under Capabilities and transforms and choose Create.

The stack takes about 10–15 minutes to complete. The CloudFormation stack does the following:

  • Creates an Amazon ES domain with fine-grained access control enabled on it. Fine-grained access control is configured with a primary user in the internal user database.
  • Creates IAM role for the Lambda function with required permission to read from S3 bucket and write to Amazon ES.
  • Creates Lambda within the same VPC of Amazon ES elastic network interfaces (ENI). Amazon ES places an ENI in the VPC for each of your data nodes. The communication from Lambda to the Amazon ES domain is via this ENI.
  • Configures file create event notification on Access log S3 bucket to trigger the Lambda function. The function code segments are discussed in detail in this GitHub project.

You must make several considerations before you proceed with a production-grade deployment. For this post, I use one primary shard with no replicas. As a best practice, we recommend deploying your domain into three Availability Zones with at least two replicas. This configuration lets Amazon ES distribute replica shards to different Availability Zones than their corresponding primary shards and improves the availability of your domain. For more information about sizing your Amazon ES, see Get started with Amazon Elasticsearch Service: T-shirt-size your domain.

We recommend setting the shard count based on your estimated index size, using 50 GB as a maximum target shard size. You should also define an index template to set the primary and replica shard counts before index creation. For more information about best practices, see Best practices for configuring your Amazon Elasticsearch Service domain.

For high-frequency incoming data, you can rotate indexes either per day or per week depending on the size of data being generated. You can use Index State Management to define custom management policies to automate routine tasks and apply them to indexes and index patterns.

Creating the Kibana user

With Amazon ES, you can configure fine-grained users to control access to your data. Fine-grained access control adds multiple capabilities to give you tighter control over your data. This feature includes the ability to use roles to define granular permissions for indexes, documents, or fields and to extend Kibana with read-only views and secure multi-tenant support. For more information on granular access control, see Fine-Grained Access Control in Amazon Elasticsearch Service.

For this post, you create a fine-grained role for Kibana access and map it to a user.

  1. Navigate to Kibana and enter the primary user credentials:
    1. User nameadminuser01
    2. PasswordStrongP@ssw0rd

To access Kibana, you must have access to the VPC. For more information about accessing Kibana, see Controlling Access to Kibana.

  1. Choose Security, Roles.
  2. For Role name, enter kibana_only_role.
  3. For Cluster-wide permissions, choose cluster_composite_ops_ro.
  4. For Index patterns, enter access-log and kibana.
  5. For Permissions: Action Groups, choose read, delete, index, and manage.
  6. Choose Save Role Definition.
  7. Choose Security, Internal User Database, and Create a New User.
  8. For Open Distro Security Roles, choose Kibana_only_role (created earlier).
  9. Choose Submit.

The user kibanauser01 now has full access to Kibana and access-logs indexes. You can log in to Kibana with this user and create the visuals and dashboards.

Building dashboards

You can use Kibana to build interactive visuals and analyze the trends and combine the visuals for different use cases in a dashboard. For example, you may want to see the number of requests made to the buckets in the last two days.

  1. Log in to Kibana using kibanauser01.
  2. Create an index pattern and set the time range
  3. On the Visualize section of your Kibana dashboard, add a new visualization.
  4. Choose Vertical Bar.

You can select any time range and visual based on your requirements.

  1. Choose the index pattern and then configure your graph options.
  2. In the Metrics pane, expand Y-Axis.
  3. For Aggregation, choose Count.
  4. For Custom Label, enter Request Count.
  5. Expand the X-Axis
  6. For Aggregation, choose Terms.
  7. For Field, choose bucket.
  8. For Order By, choose metric: Request Count.
  9. Choose Apply changes.
  10. Choose Add sub-bucket and expand the Split Series
  11. For Sub Aggregation, choose Date Histogram.
  12. For Field, choose requestdatetime.
  13. For Interval, choose Daily.
  14. Apply the changes by choosing the play icon at the top of the page.

You should see the visual on the right side, similar to the following screenshot.

You can combine graphs of different use cases into a dashboard. I have built some example graphs for general use cases like the number of operations per bucket, user action breakdown for buckets, HTTPS status rate, top users, and tabular formatted error details. See the following screenshots.

Cleaning up

Delete all the resources deployed through the CloudFormation template to avoid any unintended costs.

  1. Disable the access log on source bucket.
  2. On to the CloudFormation console, identify the stacks appropriately, and delete

Summary

This post detailed a solution to visualize and monitor Amazon S3 access logs using Amazon ES to ensure compliance, perform security audits, and discover risks and patterns at scale with minimal latency. To learn about best practices of Amazon ES, see Amazon Elasticsearch Service Best Practices. To learn how to analyze and create a dashboard of data stored in Amazon ES, see the AWS Security Blog.


About the Authors

Mahesh Goyal is a Data Architect in Big Data at AWS. He works with customers in their journey to the cloud with a focus on big data and data warehouses. In his spare time, Mahesh likes to listen to music and explore new food places with his family.

 

 

 

 

Implementing LDAP authentication for Hive on a multi-tenant Amazon EMR cluster

Post Syndicated from Kiran Erra original https://aws.amazon.com/blogs/big-data/implementing-ldap-authentication-for-hive-on-a-multi-tenant-amazon-emr-cluster/

As Amazon EMR continues its widespread adoption, it’s important to enforce separation of duties using role-based access when submitting your hive jobs on EMR clusters in multi-tenant environments. In this post, we walk through the steps to set up authentication for Hive using Lightweight Directory Access Protocol (LDAP) and Microsoft Active Directory Domain Controller.

Solution overview

In a multi-tenant environment, it’s critical to enforce role-based access when submitting Hive jobs to an EMR cluster. Although you may add Hive steps to an existing cluster, such a setup doesn’t enforce role-based access, because Amazon EMR steps are always submitted using the default Hive user. The default way of submitting a Hive job to an EMR cluster is by using the Add Step functionality. This post outlines the process by which you can enforce EMRFS role mappings when an active directory user submits a Hive job after authenticating via LDAP and Microsoft Active Directory Domain Controller. The following diagram illustrates the provisioned infrastructure from AWS CloudFormation.

The following AWS services are used as part of the recommended solution:

  • AWS Secrets ManagerAWS Secrets Manager helps you protect secrets needed to access your applications, services, and IT resources. The service enables you to easily rotate, manage, and retrieve database credentials, API keys, and other secrets throughout their lifecycle.
  • Amazon EMR – Amazon EMR makes it easy to process large amounts of data efficiently. Amazon EMR uses Hadoop processing combined with several AWS products to do tasks such as web indexing, data mining, log file analysis, machine learning, scientific simulation, and data warehousing.
  • Amazon EC2Amazon Elastic Compute Cloud (Amazon EC2) provides secure, resizable compute capacity in the cloud. It’s designed to make web-scale cloud computing easier for developers.

In our solution (as we discuss it in this post), the corporate user base is maintained in the Microsoft Active Directory Domain Controller. The EMR cluster is integrated with AD using a bootstrap action so that you can securely submit Hive jobs using a beeline by establishing an LDAP connection from an edge node (represented by an EC2 instance). The user credentials are stored in and fetched from Secrets Manager, when establishing the beeline connection.

Prerequisites

Before getting started, you must have the following prerequisites:

  • Microsoft Active Directory Domain Controller needs to be installed and set up. For a quick setup of Microsoft Active Directory Domain Controller and VPC, see the step Launch and configure an Active Directory domain controller in the Deploying each component individually section of the post Implement perimeter security in Amazon EMR using Apache Knox.
  • A valid AWS account with access to AWS services.
  • An Amazon VPC with a public subnet.
  • An AWS Identity and Access Management (IAM) policy for Secrets Manager permissions.

Implementing the solution

We provide the CloudFormation template in this post as a general guide. Please review and customize it as needed. You should also be aware that some of the resources deployed by this stack incur costs when they remain in use. The CloudFormation template has the following steps:

  1. Start an EMR cluster with the configuration from the parameters.
  2. Integrate the EMR cluster with AD using a bootstrap action.
  3. Create and launch an EC2 instance to test the integration.
  4. Add an inbound rule to the Amazon EMR primary additional security group to allow port 10000 on the newly launched EC2 instance.

This section describes how to use the Cloud Formation templates to launch an EMR cluster with the following parameters:

Parameter Default Value Description
ClusterName emr-ldap4hive The name of the cluster.
CoreInstanceType m4.xlarge The instance type of the nodes.
CoreNodeCount 2 The number of nodes in the cluster.
CreateLogBucket FALSE A Boolean flag to see if we need to set up a bucket for logs.
KeyPair Key pair used to log in to the EC2 instance for validation.
MasterInstanceType m4.xlarge The instance type of the nodes.
ReleaseLabel emr-6.0.0 Amazon EMR version. This template is tested with emr-6.0.0 or emr-5.29.0.
RemoteAccessCIDR The CIDR range to access Amazon EMR. This is usually the same as the IP address of the local machine.
VPCID VPC ID used in Amazon EMR configuration. Make sure you select a public VPC.
SubnetId Subnet ID used in Amazon EMR configuration. Make sure you select the subnet that belongs to the VPC selected.
ldapurl The LDAP URL of the AD domain controller, in the format ldap://<Private IP of AD domain controller>:389. Please refer to the first item in the Prerequisites section.
passwd4awsadmin Password@123 The AD admin password. Must be at least eight characters containing letters, numbers, and symbols.
EC2 AMI ami-0ac80df6eff0e70b5 The AMI used to create the EC2 instance for validation.
My IP The IP address of the local machine.

The following screenshot shows the Specify stack details page when launching your template.

A bootstrap script ldap-bootstrap.sh is invoked during the cluster creation to perform the following actions:

  • Fetch the login credentials for the Active Directory domain admin from Secrets Manager
  • Perform the realm join using the credentials fetched
  • Enable password-based authentication to the cluster

To deploy the template into your account, choose Launch Stack:

The following screenshot shows the EMR cluster the Cloud Formation stack created.

Validating the solution

To validate the solution, SSH to the Ubuntu EC2 instance using the EC2 key pair, as shown in the following screenshot. Refer to the Outputs tab from your AWS CloudFormation stack.

For this post, we used the Ubuntu Server 18.04 LTS (HVM), SSD Volume Type – ami-07ebfd5b3428b6f4d (64-bit x86) / ami-0400a1104d5b9caa1 (64-bit Arm) AMI.

You should see the Python Hive beeline script in /home/ubuntu:

Run demo-hive-beeline.py as shown in the following screenshot. This Python script fetches the AD credentials from Secrets Manager, establishes a beeline connection for Hive on Amazon EMR, submits Hive commands to create an external table for the NYC taxi dataset located in your Amazon Simple Storage Service (Amazon S3) bucket, and runs a sample select statement on the table.

The script has the following parameters:

  • -r or –region_name – AWS Region
  • -s or –secret-id – Secret ARN
  • -h or –host-name – Amazon EMR public DNS address

Cleaning up

Delete the CloudFormation stack to clean up all the resources created in this post. Also, stop the EC2 Ubuntu instance that you created in the verification step. If you used the nested stack, AWS CloudFormation deletes all resources in one operation. If you deployed the templates individually, delete them in the reverse order of creation, deleting the VPC stack last.

Conclusion

In this post, we went through the setup and validation of LDAP authentication for Hive using an EMR cluster. This decouples the authentication mechanism from Hive and Amazon EMR and leverages the system of record using LDAP and Active Directory Domain Controller.


About the authors

Kiran Erra is a data architect with AWS. He works with AWS customers to provide guidance and technical assistance about Big Data, AI/ML and Security projects, helping them improve the value of their solutions when using AWS.

 

 

 

Rajarao Vijjapu is a security data architect with AWS. He works with AWS customers and partners to provide guidance and technical assistance about Big Data, Analytics, AI/ML and Security projects, helping them improve the value of their solutions when using AWS.

 

 

 

Enhanced monitoring and automatic scaling for Apache Flink

Post Syndicated from Karthi Thyagarajan original https://aws.amazon.com/blogs/big-data/enhanced-monitoring-and-automatic-scaling-for-apache-flink/

Thousands of developers use Apache Flink to build streaming applications to transform and analyze data in real time. Apache Flink is an open-source framework and engine for processing data streams. It’s highly available and scalable, delivering high throughput and low latency for the most demanding stream-processing applications. Monitoring and scaling your applications is critical to keep your applications running successfully in a production environment.

Amazon Kinesis Data Analytics reduces the complexity of building and managing Apache Flink applications. Amazon Kinesis Data Analytics manages the underlying Apache Flink components that provide durable application state, metrics and logs, and more. Kinesis Data Analytics recently announced new Amazon CloudWatch metrics and the ability to create custom metrics to provide greater visibility into your application.

In this post, we show you how to easily monitor and automatically scale your Apache Flink applications with Amazon Kinesis Data Analytics. We walk through three examples. First, we create a custom metric in the Kinesis Data Analytics for Apache Flink application code. Second, we use application metrics to automatically scale the application. Finally, we share a CloudWatch dashboard for monitoring your application and recommend metrics that you can alarm on.

Custom metrics

Kinesis Data Analytics uses Apache Flink’s metrics system to send custom metrics to CloudWatch from your applications. For more information, see Using Custom Metrics with Amazon Kinesis Data Analytics for Apache Flink.

We use a basic word count program to illustrate the use of custom metrics. The following code shows how to extend RichFlatMapFunction to track the number of words it sees. This word count is then surfaced via the Flink metrics API.

private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
     
            private transient Counter counter;
     
            @Override
            public void open(Configuration config) {
                this.counter = getRuntimeContext().getMetricGroup()
                        .addGroup("kinesisanalytics")
                        .addGroup("Service", "WordCountApplication")
                        .addGroup("Tokenizer")
                        .counter("TotalWords");
            }
     
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>>out) {
                // normalize and split the line
                String[] tokens = value.toLowerCase().split("\\W+");
     
                // emit the pairs
                for (String token : tokens) {
                    if (token.length() > 0) {
                        counter.inc();
                        out.collect(new Tuple2<>(token, 1));
                    }
                }
            }
        }

Custom metrics emitted through the Flink metrics API are forwarded to CloudWatch metrics by Kinesis Data Analytics for Apache Flink. The following screenshot shows the word count metric in CloudWatch.

Custom automatic scaling

This section describes how to implement an automatic scaling solution for Kinesis Data Analytics for Apache Flink based on CloudWatch metrics. You can configure Kinesis Data Analytics for Apache Flink to perform CPU-based automatic scaling. However, you can automatically scale your application based on something other than CPU utilization. To perform custom automatic scaling, use Application Auto Scaling with the appropriate metric.

For applications that read from a Kinesis stream source, you can use the metric millisBehindLatest. This captures how far behind your application is from the head of the stream.

A target tracking policy is one of two scaling policy types offered by Application Auto Scaling. You can specify a threshold value around which to vary the degree of parallelism of your Kinesis Data Analytics application. The following sample code on GitHub configures Application Auto Scaling when millisBehindLatest for the consuming application exceeds 1 minute. This increases the parallelism, which increases the number of KPUs.

The following diagram shows how Application Auto Scaling, used with Amazon API Gateway and AWS Lambda, scales a Kinesis Data Analytics application in response to a CloudWatch alarm.

The sample code includes examples for automatic scaling based on the target tracking policy and step scaling policy.

Automatic scaling solution components

The following is a list of key components used in the automatic scaling solution. You can find these components in the AWS CloudFormation template in the GitHub repo accompanying this post.

  • Application Auto Scaling scalable target – A scalable target is a resource that Application Auto Scaling can scale in and out. It’s uniquely identified by the combination of resource ID, scalable dimension, and namespace. For more information, see RegisterScalableTarget.
  • Scaling policy – The scaling policy defines how your scalable target should scale. As described in the PutScalingPolicy, Application Auto Scaling supports two policy types: TargetTrackingScaling and StepScaling. In addition, you can configure a scheduled scaling action using Application Auto Scaling. If you specify TargetTrackingScaling, Application Auto Scaling also creates corresponding CloudWatch alarms for you.
  • API Gateway – Because the scalable target is a custom resource, we have to specify an API endpoint. Application Auto Scaling invokes this to perform scaling and get information about the current state of our scalable resource. We use an API Gateway and Lambda function to implement this endpoint.
  • Lambda – API Gateway invokes the Lambda function. This is called by Application Auto Scaling to perform the scaling actions. It also fetches information such as current scale value and returns information requested by Application Auto Scaling.

Additionally, you should be aware of the following:

  • When scaling out or in, this sample only updates the overall parallelism. It doesn’t adjust parallelism or KPU.
  • When scaling occurs, the Kinesis Data Analytics application experiences downtime.
  • The throughput of a Flink application depends on many factors, such as complexity of processing and destination throughput. The step-scaling example assumes a relationship between incoming record throughput and scaling. The millisBehindLatest metric used for target tracking automatic scaling works the same way.
  • We recommend using the default scaling policy provided by Kinesis Data Analytics for CPU-based scaling, the target tracking auto scaling policy for the millisBehindLatest metric, and a step scaling auto scaling policy for a metric such as numRecordsInPerSecond. However, you can use any automatic scaling policy for the metric you choose.

CloudWatch operational dashboard

Customers often ask us about best practices and the operational aspects of Kinesis Data Analytics for Apache Flink. We created a CloudWatch dashboard that captures the key metrics to monitor. We categorize the most common metrics in this dashboard with the recommended statistics for each metric.

This GitHub repo contains a CloudFormation template to deploy the dashboard for any Kinesis Data Analytics for Apache Flink application. You can also deploy a demo application with the dashboard. The dashboard includes the following:

  • Application health metrics:
    • Use uptime to see how long the job has been running without interruption and downtime to determine if a job failed to run. Non-zero downtime can indicate issues with your application.
    • Higher-than-normal job restarts can indicate an unhealthy application.
    • Checkpoint information size, duration, and number of failed checkpoints can help you understand application health and progress. Increasing checkpoint duration values can signify application health problems like backpressure and the inability to keep up with input data. Increasing checkpoint size over time can point to an infinitely growing state that can lead to out-of-memory errors.
  • Resource utilization metrics:
    • You can check the CPU and heap memory utilization along with the thread count. You can also check the garbage collection time taken across all Flink task managers.
  • Flink application progress metrics:
    • numRecordsInPerSecond and numRecordsOutPerSecond show the number of records accepted and emitted per second.
    • numLateRecordsDropped shows the number of records this operator or task has dropped due to arriving late.
    • Input and output watermarks are valid only when using event time semantics. You can use the difference between these two values to calculate event time latency.
  • Source metrics:
    • The Kinesis Data Streams-specific metric millisBehindLatest shows that the consumer is behind the head of the stream, indicating how far behind current time the consumer is. We used this metric to demonstrate Application Auto Scaling earlier in this post.
    • The Kafka-specific metric recordsLagMax shows the maximum lag in terms of number of records for any partition in this window.

The dashboard contains useful metrics to gauge the operational health of a Flink application. You can modify the threshold, configure additional alarms, and add other system or custom metrics to customize the dashboard for your use. The following screenshot shows a section of the dashboard.

Summary

In this post, we covered how to use the enhanced monitoring features for Kinesis Data Analytics for Apache Flink applications. We created custom metrics for an Apache Flink application within application code and emitted it to CloudWatch. We also used Application Auto Scaling to scale an application. Finally, we shared a CloudWatch dashboard to monitor the operational health of Kinesis Data Analytics for Apache Flink applications. For more information about using Kinesis Data Analytics, see Getting Started with Amazon Kinesis Data Analytics.


About the Authors

Karthi Thyagarajan is a Principal Solutions Architect on the Amazon Kinesis team.

 

 

 

 

Deepthi Mohan is a Sr. TPM on the Amazon Kinesis Data Analytics team.

Stream CDC into an Amazon S3 data lake in Parquet format with AWS DMS

Post Syndicated from Viral Shah original https://aws.amazon.com/blogs/big-data/stream-cdc-into-an-amazon-s3-data-lake-in-parquet-format-with-aws-dms/

Most organizations generate data in real time and ever-increasing volumes. Data is captured from a variety of sources, such as transactional and reporting databases, application logs, customer-facing websites, and external feeds. Companies want to capture, transform, and analyze this time-sensitive data to improve customer experiences, increase efficiency, and drive innovations. With increased data volume and velocity, it’s imperative to capture the data from source systems as soon as they are generated and store them on a secure, scalable, and cost-efficient platform.

AWS Database Migration Service (AWS DMS) performs continuous data replication using change data capture (CDC). Using CDC, you can determine and track data that has changed and provide it as a stream of changes that a downstream application can consume and act on. Most database management systems manage a transaction log that records changes made to the database contents and metadata. AWS DMS reads the transaction log by using engine-specific API operations and functions and captures the changes made to the database in a nonintrusive manner.

Amazon Simple Storage Service (Amazon S3) is the largest and most performant object storage service for structured and unstructured data and the storage service of choice to build a data lake. With Amazon S3, you can cost-effectively build and scale a data lake of any size in a secure environment where data is protected by 99.999999999% of durability.

AWS DMS offers many options to capture data changes from relational databases and store the data in columnar format (Apache Parquet) into Amazon S3:

The second option helps you build a flexible data pipeline to ingest data into an Amazon S3 data lake from several relational and non-relational data sources, compared to just relational data sources support in the former option. Kinesis Data Firehose provides pre-built AWS Lambda blueprints for converting common data sources such as Apache logs and system logs to JSON and CSV formats or writing your own custom functions. It can also convert the format of incoming data from JSON to Parquet or Apache ORC before storing the data in Amazon S3. Data stored in columnar format gives you faster and lower-cost queries with downstream analytics services like Amazon Athena.

In this post, we focus on the technical challenges outlined in the second option and how to address them.

As shown in the following reference architecture, data is ingested from a database into Parquet format in Amazon S3 via AWS DMS integrating with Kinesis Data Streams and Kinesis Data Firehose.

Our solution provides flexibility to ingest data from several sources using Kinesis Data Streams and Kinesis Data Firehose with built-in data format conversion and integrated data transformation capabilities before storing data in a data lake. For more information about data ingestion into Kinesis Data Streams, see Writing Data into Amazon Kinesis Data Streams. You can then query Parquet data in Amazon S3 efficiently with Athena.

Implementing the architecture

AWS DMS can migrate data to and from most widely used commercial and open-source databases. You can migrate and replicate data directly to Amazon S3 in CSV and Parquet formats, and store data in Amazon S3 in Parquet because it offers efficient compression and encoding schemes. Parquet format allows compression schemes on a per-column level, and is future-proofed to allow adding more encodings as they are invented and implemented.

AWS DMS supports Kinesis Data Streams as a target. Kinesis Data Streams is a massively scalable and durable real-time data streaming service that can collect and process large streams of data records in real time. AWS DMS service publishes records to a data stream using JSON. For more information about configuration details, see Use the AWS Database Migration Service to Stream Change Data to Amazon Kinesis Data Streams.

Kinesis Data Firehose can pull data from Kinesis Data Streams. It’s a fully managed service that delivers real-time streaming data to destinations such as Amazon S3, Amazon Redshift, Amazon Elasticsearch Service (Amazon ES), and Splunk. Kinesis Data Firehose can convert the format of input data from JSON to Parquet or ORC before sending it to Amazon S3. It needs reference schema to interpret the AWS DMS streaming data in JSON and convert into Parquet. In this post, we use AWS Glue, a fully managed ETL service, to create a schema in the AWS Glue Data Catalog for Kinesis Data Firehose to reference.

When AWS DMS migrates records, it creates additional fields (metadata) for each migrated record. The metadata provides additional information about the record being migrated, such as source table name, schema name, and type of operation. Most metadata fields add – in their field names (for example, record-type, schema-name, table-name, transaction-id). See the following code:

{
        "data": {
            "MEET_CODE": 5189459,
            "MEET_DATE": "2020-02-21T19:20:04Z",
            "RACE_CODE": 5189459,
            "LAST_MODIFIED_DATE": "2020-02-24T19:20:04Z",
            "RACE_ENTRY_CODE": 11671651,
            "HORSE_CODE": 5042811
        },
        "metadata": {
            "transaction-id": 917505,
            "schema-name": "SH",
            "operation": "insert",
            "table-name": "RACE_ENTRY",
            "record-type": "data",
            "timestamp": "2020-02-26T00:20:07.482592Z",
            "partition-key-type": "schema-table"
        }
    }

Additional metadata added by AWS DMS leads to an error during the data format conversion phase in Kinesis Data Firehose. Kinesis Data Firehose follows Hive style formatting and therefore doesn’t recognize the – character in the metadata field names during data conversion from JSON into Parquet and returns an error message: expected at the position 30 of ‘struct’ but ‘-’ is found. For example, see the following code:

{
	"deliveryStreamARN": "arn:aws:firehose:us-east-1:1234567890:deliverystream/abc-def-KDF",
	"destination": "arn:aws:s3:::abc-streaming-bucket",
	"deliveryStreamVersionId": 13,
	"message": "The schema is invalid. Error parsing the schema:
	 Error: : expected at the position 30 of 'struct<timestamp:string,record-type:string,operation:string,partition-key-type:string,schema-name:string,table-name:string,transaction-id:int>' but '-' is found.",
	"errorCode": "DataFormatConversion.InvalidSchema"
}

You can resolve the issue by making the following changes: specifying JSON key mappings and creating a reference table in AWS Glue before configuring Kinesis Data Firehose.

Specifying JSON key mappings

In your Kinesis Data Firehose configuration, specify JSON key mappings for fields with – in their names. Mapping transforms these specific metadata fields names to _ (for example, record-type changes to record_type).

Use AWS Command Line Interface (AWS CLI) to create Kinesis Data Firehose with the JSON key mappings. Modify the parameters to meet your specific requirements.

Kinesis Data Firehose configuration mapping is only possible through the AWS CLI or API and not through the AWS Management Console.

The following code configures Kinesis Data Firehose with five columns with – in their field names mapped to new field names with _”:

"S3BackupMode": "Disabled",
                    "DataFormatConversionConfiguration": {
                        "SchemaConfiguration": {
                            "RoleARN": "arn:aws:iam::123456789012:role/sample-firehose-delivery-role",
                            "DatabaseName": "sample-db",
                            "TableName": "sample-table",
                            "Region": "us-east-1",
                            "VersionId": "LATEST"
                        },
                        "InputFormatConfiguration": {
                            "Deserializer": {
                                "OpenXJsonSerDe": {
                                "ColumnToJsonKeyMappings":
                                {
                                 "record_type": "record-type","partition_key_type": "partition-key-type","schema_name":"schema-name","table_name":"table-name","transaction_id":"transaction-id"
                                }
                                }

Creating a reference table in AWS Glue

Because Kinesis Data Firehose uses the Data Catalog to reference schema for Parquet format conversion, you must first create a reference table in AWS Glue before configuring Kinesis Data Firehose. Use Athena to create a Data Catalog table. For instructions, see CREATE TABLE. In the table, make sure that the column name uses _ in their names, and manually modify it in advance through the Edit schema option for the referenced table in AWS Glue, if needed.

Use Athena to query the results of data ingested by Kinesis Data Firehose into Amazon S3.

This solution is only applicable in the following use cases:

  • Capturing data changes from your source with AWS DMS
  • Converting data into Parquet with Kinesis Data Firehose

If you want to store data in non-Parquet format (such CSV or JSON) or ingest into Kinesis through other routes, then you don’t need to modify your Kinesis Data Firehose configuration.

Conclusion

This post demonstrated how to convert AWS DMS data into Parquet format and specific configurations to make sure metadata follows the expected format of Kinesis Data Streams and Kinesis Data Firehose. We encourage you to try this solution and take advantage of all the benefits of using AWS DMS with Kinesis Data Streams and Kinesis Data Firehose. For more information, see Getting started with AWS Database Migration Service and Setting up Amazon Kinesis Firehose.

If you have questions or suggestions, please leave a comment.

 


About the Author

Viral Shah is a Data Lab Architect with Amazon Web Services. Viral helps our customers architect and build data and analytics prototypes in just four days in the AWS Data Lab. He has over 20 years of experience working with enterprise customers and startups primarily in the Data and Database space.

 

 

Developing AWS Glue ETL jobs locally using a container

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/developing-aws-glue-etl-jobs-locally-using-a-container/

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. In the fourth post of the series, we discussed optimizing memory management. In this post, we focus on writing ETL scripts for AWS Glue jobs locally. AWS Glue is built on top of Apache Spark and therefore uses all the strengths of open-source technologies. AWS Glue comes with many improvements on top of Apache Spark and has its own ETL libraries that can fast-track the development process and reduce boilerplate code.

The AWS Glue team released the AWS Glue binaries and let you set up an environment on your desktop to test your code. We have used these libraries to create an image with all the right dependencies packaged together. The image has AWS Glue 1.0, Apache Spark, OpenJDK, Maven, Python3, the AWS Command Line Interface (AWS CLI), and boto3. We have also bundled Jupyter and Zeppelin notebook servers in the image so you don’t have to configure an IDE and can start developing AWS Glue code right away.

The AWS Glue team will release new images for various AWS Glue updates. The tags of the new images will follow the following convention: glue_libs_<glue-version>_image_<image-version>. For example, glue_libs_1.0.0_image_01. In this name, 1.0 is the AWS Glue major version, .0 is the patch version, and 01 is the image version. The patch version will be incremented for updates to the AWS Glue libraries of a major release. Image version will be incremented for the release of a new image of a major AWS Glue release. Both these increments will be reset with every major AWS Glue release. So, the first image released for AWS Glue 2.0 will be glue_libs_2.0.0_image_01.

We recommend pulling the highest image version for an AWS Glue major version to get the latest updates.

Prerequisites

Before you start, make sure that Docker is installed and the Docker daemon is running. For installation instructions, see the Docker documentation for Mac, Windows, or Linux. The machine running the Docker hosts the AWS Glue container. Also make sure that you have at least 7 GB of disk space for the image on the host running the Docker.

For more information about restrictions when developing AWS Glue code locally, see Local Development Restrictions.

Solution overview

In this post, we use amazon/aws-glue-libs:glue_libs_1.0.0_image_01 from Docker Hub. This image has only been tested for an AWS Glue 1.0 Spark shell (both for PySpark and Scala). It hasn’t been tested for an AWS Glue 1.0 Python shell.

We organize this post into the following three sections. You only have to complete one of the three sections (not all three) depending on your requirement:

  • Setting up the container to use Jupyter or Zeppelin notebooks
  • Setting up the Docker image with PyCharm Professional
  • Running against the CLI interpreter

This post uses the following two terms frequently:

  • Client – The system from which you access the notebook. You open a web browser on this system and put the notebook URL.
  • Host – The system that hosts the Docker daemon. The container runs on this system.

Sometimes, your client and host can be the same system.

Setting up the container to use Jupyter or Zeppelin notebooks

Setting up the container to run PySpark code in a notebook includes three high-level steps:

  1. Pulling the image from Docker Hub.
  2. Running the container.
  3. Opening the notebook.

Pulling the image from Docker Hub

If you’re running Docker on Windows, choose the Docker icon (right-click) and choose Switch to Linux containers… before pulling the image.

Open cmd on Windows or terminal on Mac and run the following command:

docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01

Running the container

We pulled the image from Docker Hub in the previous step. We now run a container using this image.

The general format of the run command is:

docker run -itd -p <port_on_host>:<port_on_container_either_8888_or_8080> -p 4040:4040 <credential_setup_to_access_AWS_resources> --name <container_name> amazon/aws-glue-libs:glue_libs_1.0.0_image_01 <command_to_start_notebook_server>

The code includes the following information:

  • <port_on_host> – The local port of your host that is mapped to the port of the container. For our use case, the container port is either 8888 (for a Jupyter notebook) or 8080 (for a Zeppelin notebook). To keep things simple, we use the same port number as the notebook server ports on the container in the following examples.
  • <port_on_container_either_8888_or_8080> – The port of the notebook server on the container. The default port of Jupyter is 8888; the default port of Zeppelin is 8080.
  • 4040:4040 – This is required for SparkUI. 4040 is the default port for SparkUI. For more information, see Web Interfaces.
  • <credential_setup_to_access_AWS_resources> – In this section, we go with the typical case of mounting the host’s directory, containing the credentials. We assume that your host has the credentials configured using aws configure. The flow chart in the Appendix section explains various ways to set the credentials if the assumption doesn’t hold for your environment.
  • <container_name> – The name of the container. You can use any text here.

  • amazon/aws-glue-libs:glue_libs_1.0.0_image_01 – The name of the image that we pulled in the previous step.
  • <command_to_start_notebook_server> – We run /home/zeppelin/bin/zeppelin.sh for a Zeppelin notebook and /home/jupyter/jupyter_start.sh for a Jupyter notebook. If you want to run your code against the CLI interpreter, you don’t need a notebook server and can leave this argument blank.
The following example code starts a Jupyter notebook and passes read-only credentials from a Mac or Linux host:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh
The following example code starts a Jupyter notebook and passes read-write credentials from a Windows host:

docker run -itd -p 8888:8888 -p 4040:4040 -v %UserProfile%\.aws:/root/.aws:rw --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

To run a Zeppelin notebook, replace 8888:8888 with 8080:8080, glue_jupyter with glue_zeppelin, and /home/jupyter/jupyter_start.sh with /home/zeppelin/bin/zeppelin.sh. For example, the following command starts a Zeppelin notebook server and passes read-only credentials from a Mac or Linux host:

docker run -itd -p 8080:8080 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_zeppelin amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/zeppelin/bin/zeppelin.sh

You can now run the following command to make sure that the container is running:

docker ps

The Jupyter notebook is configured to allow connections from all IP addresses without authentication, and the Zeppelin notebook is configured to use anonymous access. This configuration makes sure that you can start working on your local machine with just two commands (docker pull and docker run). If your scenario mandates a different configuration, run the container without running the notebook startup script (/home/jupyter/jupyter_start.sh or /home/zeppelin/bin/zeppelin.sh). This starts the container but not the notebook server. You can then run the bash shell on the container using the following command, edit the required notebook configurations, and start the notebook server:

docker exec -it <container_name> bash

For example,

docker exec -it glue_jupyter bash.

The following example code is the docker run command without the notebook server startup:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01

If you’re running the container on Amazon Elastic Compute Cloud (Amazon EC2) instance, you have to set up your inbound rules in the security group to allow communication on the ports used by the notebook server. A broad inbound rule can create security risks. For more information, see AWS Security Best Practices.

Opening the notebook

If your client and host are the same machine, enter the following URL for Jupyter: http://localhost:8888.

You can write PySpark code in the notebook as shown here. You can also use SQL magic (%%sql) to directly write SQL against the tables in the AWS Glue Data Catalog. If your catalog table is on top of JSON data, you have to place json-serde.jar in the /home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/jars directory of the container and restart the kernel in your Jupyter notebook. You can place the jar in this directory by first running the bash shell on the container using the following command:

docker exec -it <container_name> bash

If you have a local directory that holds your notebooks, you can mount it to /home/jupyter/jupyter_default_dir using the -v option. These notebooks are available to you when you open the Jupyter notebook URL. For example, see the following code:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro -v C:\Users\admin\Documents\notebooks:/home/jupyter/jupyter_default_dir --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

The URL for Zeppelin is http://localhost:8080.

For Zeppelin notebooks, include %spark.pyspark on the top to run PySpark code.

If your host is Amazon EC2 and your client is your laptop, replace localhost in the preceding URLs with your host’s public IP.

Depending on your network or if you’re on a VPN, you might have to set an SSH tunnel. The general format of the tunnel is the following code:

ssh -i <absolute_path_to_your_private_key_for_EC2> -v -N -L <port_on_client>:<ip_of_the_container>:<port_8888_or_8080> ec2-user@<public_ip_address_of_ec2_host>

Your security group controlling the EC2 instance should allow inbound on port 22 from the client. A broad inbound rule can create security risks. For more information, see AWS Security Best Practices.

You can get the <ip_of_the_container> under the IPAddress field when you run docker inspect <container_name>. For example: docker inspect glue_jupyter.

If you set up the tunnel, the URL to access the notebook is: http://localhost:<port_on_client>.

Use 8888 or 8080 for <port_8888_or_8080>, depending on if you’re running a Jupyter or Zeppelin notebook.

You can now use the following sample code to test your notebook:

from pyspark import SparkContext
from awsglue.context import GlueContext

glueContext = GlueContext(SparkContext.getOrCreate()) 
inputDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://awsglue-datasets/examples/us-legislators/all/memberships.json"]}, format = "json")
inputDF.toDF().show()

Although awsglue-datasets is a public bucket, you at least need the following permissions, attached to the AWS Identity and Access Management (IAM) user used for your container, to view the data:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "S3ReadOnly",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": "arn:aws:s3:::awsglue-datasets/*"
        }
    ]
}

You can also see the databases in your AWS Glue Data Catalog using the following code:

spark.sql("show databases").show()

You need AWS Glue permissions to run the preceding command. The following are the minimum permissions required to run the code. Replace <account_number> with your account number and <region> with your Region:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "GlueAccess",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetDatabases"
            ],
            "Resource": [
                "arn:aws:glue:<region>:<account_number>:database/*",
                "arn:aws:glue:<region>:<account_number>:catalog"
            ]
        }
    ]
}

Similarly, you can query the AWS Glue Data Catalog tables too. If your host is Amazon EC2 instance, you see the catalog of the Region of your EC2 instance. If your host is local, you see the catalog of the Region set in your aws configure or your AWS_REGION variable.

You can stop here if you want to develop AWS Glue code locally using only notebooks.

Setting up the Docker image with PyCharm Professional

This section talks about setting up PyCharm Professional to use the image. For this post, we use Windows. There may be a few differences when using PyCharm on a Mac.

  1. Open cmd (or terminal for Mac) and pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01 using the following command:
    docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01

    If you’re running Docker on Windows, choose the Docker icon (right-click) and choose Switch to Linux containers… before pulling the image.

  2. Choose the Docker icon (right-click) and choose Settings (this step isn’t required for Mac or Linux).
  3. In the General section, select Expose daemon on tcp://localhost:2375 without TLS (this step isn’t required for Mac or Linux). Note the warning listed under the checkbox. This step is based on PyCharm documentation.
  4. Choose Apply & Restart (this step isn’t required for Mac or Linux).
  5. Choose the Docker icon (right-click) and choose Restart… if the Docker doesn’t restart automatically (this step isn’t required for Mac or Linux).
  6. Open PyCharm and create a Pure Python project (if you don’t have one).
  7. Under File, choose Settings… (for Mac, under PyCharm, choose Preferences).
  8. Under Settings, choose Project Interpreter. In the following screenshot, GlueProject is the name of my project. Your project name might be different.
  9. Choose Show All… from the drop-down menu.
  10. Choose the + icon.

  11. Choose Docker.
  12. Choose New.
  13. For Name, enter a name (for example, Docker-Glue).
  14. Keep other settings at their default.
  15. If running on Windows, for Connect to Docker daemon with, select TCP socket and enter the Engine API URL.
    For this post, we enter tcp://localhost:2375 because Docker and PyCharm are on the same Windows machine.
    If running on a Mac, select Docker for Mac. No API URL is required.
  16. Make sure you see the message Connection successful.

For Windows, if you don’t see this message, Docker may not have restarted after you changed the settings in Step 4. Restart the Docker and repeat these steps again. For more information about connection settings, see PyCharm documentation.

The following screenshots show steps 13-16 in Windows and Mac.

  1. Choose OK.

You should now see the image listed in the drop-down menu.

  1. Choose the image that you pulled from Docker Hub (amazon/aws-glue-libs:glue_libs_1.0.0_image_01).
  2. Choose OK.

You now see the interpreter listed.

  1. Choose OK.

This lists all the packages in the image.

  1. Choose OK.

Steps 22-27 help you get AWS Glue-related code completion suggestions from PyCharm.

  1. Download the following file: https://s3.amazonaws.com/aws-glue-jes-prod-us-east-1-assets/etl-1.0/python/PyGlue.zip.
  2. Under File, choose Settings (for Mac, under PyCharm, choose Preferences).
  3. Under Project: <Project name>, choose Project Structure.
  4. Choose Add Content Root.
  5. Choose the newly downloaded PyGlue.zip file.
  6. In the Settings window, choose OK.
  7. Choose the project (right-click) and choose New, Python File.
  8. Enter a name for the Python file and press Enter.
  9. Enter the following code in the file and save it. For more information about the minimum permissions required to run this code, see this section.
    from pyspark import SparkContext
    from awsglue.context import GlueContext
    
    glueContext = GlueContext(SparkContext.getOrCreate()) 
    inputDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://awsglue-datasets/examples/us-legislators/all/memberships.json"]}, format = "json")
    inputDF.toDF().show()
    

  10. Choose Add Configuration.
  11. Choose the +icon.
  12. Under Add New Configuration, choose Python.
  13. For Name, enter a name.
  14. For Environment variables, enter the following:
    PYTHONPATH=/home/aws-glue-libs/awsglue.zip:/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/lib/pyspark.zip:/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/lib/py4j-0.10.7-src.zip:/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python

  15. For Script path, select the newly created script in Step 29.
  16. For Python interpreter, choose the newly created interpreter.
  17. Choose Docker Container Settings.
  18. Under Volume bindings, choose the +icon.
  19. For Host path, add the absolute path .aws folder that holds the credentials and the config files.
  20. For Container path, add /root/.aws.
  21. Choose OK.
  22. For Run/Debug Configurations, choose OK.
  23. Run the code by choosing the green button on the top right.

You can also see the databases in your AWS Glue Data Catalog using the following code. For more information about the minimum permissions required to run this code, see this section.

spark.sql("show databases").show()

Similarly, you can also query the catalog tables. If your host is Amazon EC2 instance, you see the catalog of the Region of your EC2 instance. If your host is local, you see the catalog of the Region set in your aws configure or your AWS_REGION variable.

PyCharm gives code completion suggestions for AWS Glue (see the following screenshot). This is possible because of the steps you completed earlier.

Running against the CLI interpreter

You can always run the bash shell on the container and run your PySpark code directly against the CLI interpreter in the container.

  1. Complete Pulling the image from Docker Hub step and Running the container step in the section Setting up the container to use Jupyter of Zeppelin notebooks.
  2. Run the bash shell on the container by entering the following code. Replace <container_name> with the name (--name argument) you used earlier.
    docker exec -it <container_name> bash

  3. Run one of the following commands:
    1. For PySpark, enter the following code:
      /home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/bin/pyspark

    2. For Scala, enter the following code:
      /home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/bin/spark-shell

Conclusion

In this post, we learned about a three-step process to get started on AWS Glue and Jupyter or Zeppelin notebook. Although notebooks are a great way to get started and a great asset to data scientists and data wranglers, data engineers generally have a source control repository, an IDE, and a well-defined CI/CD process. Because PyCharm is a widely used IDE for PySpark development, we showed how to use the image with PyCharm Professional. You can develop your code locally in your IDE and test it locally using the container, and your CI/CD process can run as it does with any other IDE and source control tool in your organization. Although we showed integration with PyCharm, you can similarly integrate the container with any IDE that you use to complete your CI/CD story with AWS Glue.


Appendix

The following section discusses various ways to set the credentials to access AWS resources (such as Amazon Simple Storage Service (Amazon S3), AWS Step Functions, and more) from the container.

You need to provide your AWS credentials to connect to an AWS service from the container. The AWS SDKs and CLIs use provider chains to look for AWS credentials in several different places, including system or user environment variables and in local AWS configuration files. For more information about how to set up credentials, see https://docs.aws.amazon.com/sdk-for-java/v2/developer-guide/credentials.html. To generate the credentials using the AWS Management Console, see Managing Access Keys (Console). For instructions on generating credentials with the AWS CLI, see create-access-key. For more information about generating credentials with an API, see CreateAccessKey.

The following flow chart shows the various ways to set up AWS credentials for the container. Most of these mechanisms don’t work with PyCharm because we use the image there and not the container. You can use the container as an SSH interpreter in PyCharm and then use one of the credential setting mechanisms listed here. However, that discussion is out of the scope of this post.

Note that the numbers, in brackets, match the code snippets that follow the chart.

(1) To find more info about the syntax of setting up the tunnel, see this.

(2) To set credentials using the docker cp command to copy credentials from the Windows host to the container, enter the following code (this example code uses the container name glue_jupyter):

docker cp %UserProfile%\.aws\.  glue_jupyter:/root/.aws

(3) To mount the host’s .aws directory on the container with rw option, see this.

(4) To mount the host’s .aws directory on the container with ro option, see this.

(5) To set the credentials in a file, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 --env-file /datalab_pocs/glue_local/env_variables.txt --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

/datalab_pocs/glue_local/env_variables.txt is the absolute path of the file holding the environment variables. The file should have the following variables:

  • AWS_ACCESS_KEY_ID=<Access_id>
  • AWS_SECRET_ACCESS_KEY=<Access_key>
  • AWS_REGION=<Region>

For more information about Regions, see Regions, Availability Zones, and Local Zones.

(6) To set the credentials in the docker run command, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 -e AWS_ACCESS_KEY_ID=<ID> -e AWS_SECRET_ACCESS_KEY=<Key> -e AWS_REGION=<Region>  --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

(7) To set credentials using aws configure on the container, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh
docker exec -it glue_jupyter bash
aws configure


About the Author

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with the customers on their use cases, architects a solution to solve their business problems and helps the customers build an scalable prototype. Prior to his journey in AWS, Vishal helped customers implement BI, DW and DataLake projects in US and Australia.

 

 

 

Amazon EMR supports Apache Hive ACID transactions

Post Syndicated from Suthan Phillips original https://aws.amazon.com/blogs/big-data/amazon-emr-supports-apache-hive-acid-transactions/

Apache Hive is an open-source data warehouse package that runs on top of an Apache Hadoop cluster. You can use Hive for batch processing and large-scale data analysis. Hive uses Hive Query Language (HiveQL), which is similar to SQL.

ACID (atomicity, consistency, isolation, and durability) properties make sure that the transactions in a database are atomic, consistent, isolated, and reliable.

Amazon EMR 6.1.0 adds support for Hive ACID transactions so it complies with the ACID properties of a database. With this feature, you can run INSERT, UPDATE, DELETE, and MERGE operations in Hive managed tables with data in Amazon Simple Storage Service (Amazon S3). This is a key feature for use cases like streaming ingestion, data restatement, bulk updates using MERGE, and slowly changing dimensions.

This post demonstrates how to enable Hive ACID transactions in Amazon EMR, how to create a Hive transactional table, how it can achieve atomic and isolated operations, and the concepts, best practices, and limitations of using Hive ACID in Amazon EMR.

Enabling Hive ACID in Amazon EMR

To enable Hive ACID as the default for all Hive managed tables in an EMR 6.1.0 cluster, use the following hive-site configuration:

[
   {
      "classification": "hive-site",
      "properties": {
         "hive.support.concurrency": "true",
         "hive.exec.dynamic.partition.mode": "nonstrict",
         "hive.txn.manager": "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"
      }
   }
]

For the complete list of configuration parameters related to Hive ACID and descriptions of the preceding parameters, see Hive Transactions.

Hive ACID use case

In this section, we explain the Hive ACID transactions with a straightforward use case in Amazon EMR.

Enter the following Hive command in the master node of an EMR cluster (6.1.0 release) and replace <s3-bucket-name> with the bucket name in your account:

hive --hivevar location=<s3-bucket-name> -f s3://aws-bigdata-blog/artifacts/hive-acid-blog/hive_acid_example.hql 

After Hive ACID is enabled on an Amazon EMR cluster, you can run the CREATE TABLE DDLs for Hive transaction tables.

To define a Hive table as transactional, set the table property transactional=true.

The following CREATE TABLE DDL is used in the script that creates a Hive transaction table acid_tbl:

CREATE TABLE acid_tbl (key INT, value STRING, action STRING)
PARTITIONED BY (trans_date DATE)
CLUSTERED BY (key) INTO 3 BUCKETS
STORED AS ORC
LOCATION 's3://${hivevar:location}/acid_tbl' 
TBLPROPERTIES ('transactional'='true');

This script generates three partitions in the provided Amazon S3 path. See the following screenshot.

The first partition, trans_date=2020-08-01, has the data generated as a result of sample INSERT, UPDATE, DELETE, and MERGE statements. We use the second and third partitions when explaining minor and major compactions later in this post.

ACID is achieved in Apache Hive using three types of files: base, delta, and delete_delta. Edits are written in delta and delete_delta files.

The base file is created by the Insert Overwrite Table query or as the result of major compaction over a partition, where all the files are consolidated into a single base_<write id> file, where the write ID is allocated by the Hive transaction manager for every write. This helps achieve isolation of Hive write queries and enables them to run in parallel.

The INSERT operation creates a new delta_<write id>_<write id> directory.

The DELETE operation creates a new delete_delta_<write id>_<write id> directory.

To support deletes, a unique row__id is added to each row on writes. When a DELETE statement runs, the corresponding row__id gets added to the delete_delta_<write id>_<write id> directory, which should be ignored on reads. See the following screenshot.

The UPDATE operation creates a new delta_<write id>_<write id> directory and a delete<write id>_<write id> directory.

The following screenshot shows the second partition in Amazon S3, trans_date=2020-08-02.

A Hive transaction provides snapshot isolation for reads. When an application or query reads the transaction table, it opens all the files of a partition/bucket and returns the records from the last transaction committed.

Hive compactions

With the previously mentioned logic for Hive writes on a transactional table, many small delta and delete_delta files are created, which could adversely impact read performance over time because each read over a particular partition has to open all the files (including delete_delta) to eliminate the deleted rows.

This brings the need for a compaction logic for Hive transactions. In the following sections, we use the same use case to explain minor and major compactions in Hive.

Minor compaction

A minor compaction merges all the delta and delete_delta files within a partition or bucket to a single delta_<start write id>_<end write id> and delete_delta_<start write id>_<end write id> file.

We can trigger the minor compaction manually for the second partition (trans_date=2020-08-02) in Amazon S3 with the following code:

ALTER TABLE acid_tbl PARTITION (trans_date='2020-08-02') COMPACT 'minor';

If you check the same second partition in Amazon S3, after a minor compaction, it looks like the following screenshot.

You can see all the delta and delete_delta files from write ID 0000005–0000009 merged to single delta and delete_delta files, respectively.

Major compaction

A major compaction merges the base, delta, and delete_delta files within a partition or bucket to a single base_<latest write id>. Here the deleted data gets cleaned.

A major compaction is automatically triggered in the third partition (trans_date='2020-08-03') because the default Amazon EMR compaction threshold is met, as described in the next section. See the following screenshot.

To check the progress of compactions, enter the following command:

hive> show compactions;

The following screenshot shows the output.

Compaction in Amazon EMR

Compaction is enabled by default in Amazon EMR 6.1.0. The following property determines the number of concurrent compaction tasks:

  • hive.compactor.worker.threads – Number of worker threads to run in the instance. The default is 1 or vCores/8, whichever is greater.

Automatic compaction is triggered in Amazon EMR 6.1.0 based on the following configuration parameters:

  • hive.compactor.check.interval – Time period in seconds to check if any partition requires compaction. The default is 300 seconds.
  • hive.compactor.delta.num.threshold – Triggers minor compaction when the total number of delta files is greater than this value. The default is 10.
  • hive.compactor.delta.pct.threshold – Triggers major compaction when the total size of delta files is greater than this percentage size of base file. The default is 0.1, or 10%.

Best practices

The following are some best practices when using this feature:

  • Use an external Hive metastore for Hive ACID tables – Our customers use EMR clusters for compute purposes and Amazon S3 as storage for cost-optimization. With this architecture, you can stop the EMR cluster when the Hive jobs are complete. However, if you use a local Hive metastore, the metadata is lost upon stopping the cluster, and the corresponding data in Amazon S3 becomes unusable. To persist the metastore, we strongly recommend using an external Hive metastore like an Amazon RDS for MySQL instance or Amazon Aurora. Also, if you need multiple EMR clusters running ACID transactions (read or write) on the same Hive table, you need to use an external Hive metastore.
  • Use ORC format – Use ORC format to get full ACID support for INSERT, UPDATE, DELETE, and MERGE statements.
  • Partition your data – This technique helps improve performance for large datasets.
  • Enable an EMRFS consistent view if using Amazon S3 as storage – Because you have frequent movement of files in Amazon S3, we recommend using an EMRFS consistent view to mitigate the issues related to the eventual consistency nature of Amazon S3.
  • Use Hive authorization – Because Hive transactional tables are Hive managed tables, to prevent users from deleting data in Amazon S3, we suggest implementing Hive authorization with required privileges for each user.

Limitations

Keep in mind the following limitations of this feature:

  • The AWS Glue Data Catalog doesn’t support Hive ACID transactions.
  • Hive external tables don’t support Hive ACID transactions.
  • Bucketing is optional in Hive 3, but in Amazon EMR 6.1.0 (as of this writing), if the table is partitioned, it needs to be bucketed. You can mitigate this issue in Amazon EMR 6.1.0 using the following bootstrap action:
    --bootstrap-actions '[{"Path":"s3://aws-bigdata-blog/artifacts/hive-acid-blog/make_bucketing_optional_for_hive_acid_EMR_6_1.sh","Name":"Set bucketing as optional for Hive ACID"}]'

Conclusion

This post introduced the Hive ACID feature in EMR 6.1.0 clusters, explained how it works and its concepts with a straightforward use case, described the default behavior of Hive ACID on Amazon EMR, and offered some best practices. Stay tuned for additional updates on new features and further improvements in Apache Hive on Amazon EMR.


About the Authors

Suthan Phillips is a big data architect at AWS. He works with customers to provide them architectural guidance and helps them achieve performance enhancements for complex applications on Amazon EMR. In his spare time, he enjoys hiking and exploring the Pacific Northwest.

 

 

 

 

Chao Gao is a Software Development Engineer at Amazon EMR. He mainly works on Apache Hive project at EMR, and has some in-depth knowledge of distributed database and database internals. In his spare time, he enjoys making roadtrips, visiting all the national parks and traveling around the world.

Zoopla drives KPIs with centralized data using Fivetran ELT for Amazon Redshift

Post Syndicated from Steven Collings original https://aws.amazon.com/blogs/big-data/zoopla-drives-kpis-with-centralized-data-using-fivetran-elt-for-amazon-redshift/

This is a guest post by Steven Collings, Senior Data Consultant at Zoopla

Zoopla is a property website that enables users to find residential or commercial property to buy or rent in the UK and overseas. Since acquiring Property Software Group and Expert Agent, we also offer a backend software that agents can use to build their businesses. Amidst the growth and acquisitions, we needed a way to bring together data from disparate systems to drive key performance indicators (KPIs) for all the Salesforce and NetSuite data we store in Amazon Redshift.

Building a flexible and scalable data warehouse with Amazon Redshift

Amazon Redshift is a fast, fully managed, petabyte-scale data warehouse solution that makes it simple and cost-effective to efficiently analyze all of your data using your existing business intelligence tools. We have used Amazon Redshift as our data warehouse for more than 5 years and have developed deep knowledge of the AWS analytics stack. Amazon Redshift has always performed well for us and integrates with other services we rely on, such as Amazon Simple Storage Service (Amazon S3), AWS Glue, and Microsoft Power BI, among others. Importantly, Amazon Redshift has evolved along with our needs. For example, we adopted Amazon Redshift Spectrum to query data directly in our Amazon S3 data lake so that we can scale efficiently from a cost and performance perspective, and easily combine data in the warehouse and the lake. In general, we’re pleased that AWS has continuously allowed us to scale and move forward.

Complicated custom scripts and disparate data

We had custom-built scripts pulling the data into Amazon Redshift from different places, including NetSuite and Salesforce. These were built by different people, often in different languages, and not documented. Each script required maintenance to keep up with changes to source systems and APIs. We wanted a solution to help us integrate data more quickly and efficiently, using less developer time.

In addition to custom scripts, we were using native connectors from Power BI to shortcut data straight into reports. We were integrating data too high up in the stack to be able to reuse it in the ways that we wanted to. A proliferation of Power BI models was causing data to become siloed, and we ended up with a series of point solutions. We wanted our data centralized in our Amazon Redshift data warehouse so we could ensure its quality, join it together, and create enterprise data models.

We recognized that feeding the data directly into Power BI wasn’t scalable. Power BI has a key role in our data stack for dashboarding and self-service analytics, and we wanted to keep our use of the tool squarely in its sweet spot. We didn’t want to push every piece of fine-grained data into Power BI just so we could use it for a deep-dive analysis. Not only would this approach be expensive, it also had performance implications and reduced the freedom of our analyst community to use the best tool for the job. It made more sense to have that data in Amazon Redshift (as our existing data warehouse solution), a platform that is well suited for running fine-grained, large-scale analyses using whichever tool best suits the use case.

Fivetran for automated data pipelines

We selected Fivetran to ingest the data. Fivetran replicates data from applications, databases, events, and files into Amazon Redshift. Fivetran connectors deploy in minutes, require zero maintenance, and automatically adjust to source changes so our data team can stop worrying about engineering and focus on driving insights. With Fivetran bringing data into Amazon Redshift, we have increased data quality and can easily integrate new datasets.

Freeing up engineering resources

Due to competing priorities for data engineering resources, my team faced a reduced level of support. With Fivetran, we could push ahead and make progress while working with fewer resources. We enabled existing members of the BI team to perform data integration tasks that previously required engineering effort (such as importing new sources, modifying existing sources, implementing data cleansing, and shaping logic) and freed up our data engineers to apply their skill set to value add activities beyond maintaining data pipelines.

We estimate that Fivetran currently does the work of up to one full-time engineer, and we expect that number to increase. We’re interested in adding more sources that aren’t being integrated at the moment (such as campaign performance or customer helpdesk), which will increase the number of engineering hours that Fivetran saves us.

Building out comprehensive KPIs

One of the biggest drivers for bringing on Fivetran was a project that required centralizing NetSuite and Salesforce data for a large KPI project. We had a custom-built Salesforce connector but we didn’t have the skill set on the team to maintain it, and we didn’t want to spend development resources when we could buy it off the shelf.

The project entailed building a KPI overview for the senior leadership team. The weekly dashboard monitors about 40 different KPIs and metrics across Sales, Product, Marketing, Financials, HR, and other departments. It’s constantly available to the senior leadership team and allows them to understand overall business performance and also drill down into areas of concern that require further investigation and analysis. A streamlined version of the dashboard is displayed on screens around the office so that everyone feels informed and connected to our mission.

While some of these KPIs were already available, they were spread around different systems, lived in different reports, or were never even surfaced. If they were attainable, the process was often manual and prone to errors. This has been the key deliverable. It was always in our mind that we didn’t want to build a point solution. We wanted to ensure that all the data we were landing could be leveraged for other purposes, and we wanted to make this data available in a self-service capacity. By providing faster, simpler access to data, we enable quicker, more informed decision-making and open up the next wave of questions as people understand what is possible.

Conclusion

By centralizing data into the existing Amazon Redshift data warehouse, using Fivetran to automate data ingestion, and building dashboards with Power BI we’ve created a consistent and efficient analytics process. It’s saved our team time, and made sure we’re able to continue to deliver valuable insight to our stakeholders.

Learn more about Zoopla, Fivetran and Amazon Redshift.


About the Author

Steven Collings is a Data Consultant (formerly Head of Data) at Zoopla, with 15 years experience of data storage, ETL, data modelling, and reporting & visualisation techniques and technologies.