Tag Archives: Analytics

Amazon Managed Streaming for Apache Kafka (MSK) – Now Generally Available

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/amazon-managed-streaming-for-apache-kafka-msk-now-generally-available/

I am always amazed at how our customers are using streaming data. For example, Thomson Reuters, one of the world’s most trusted news organizations for businesses and professionals, built a solution to capture, analyze, and visualize analytics data to help product teams continuously improve the user experience. Supercell, the social game company providing games such as Hay Day, Clash of Clans, and Boom Beach, is delivering in-game data in real-time, handling 45 billion events per day.

Since we launched Amazon Kinesis at re:Invent 2013, we have continually expanded the ways in in which customers work with streaming data on AWS. Some of the available tools are:

  • Kinesis Data Streams, to capture, store, and process data streams with your own applications.
  • Kinesis Data Firehose, to transform and collect data into destinations such as Amazon S3, Amazon Elasticsearch Service, and Amazon Redshift.
  • Kinesis Data Analytics, to continuously analyze data using SQL or Java (via Apache Flink applications), for example to detect anomalies or for time series aggregation.
  • Kinesis Video Streams, to simplify processing of media streams.

At re:Invent 2018, we introduced in open preview Amazon Managed Streaming for Apache Kafka (MSK), a fully managed service that makes it easy to build and run applications that use Apache Kafka to process streaming data.

I am excited to announce that Amazon MSK is generally available today!

How it works

Apache Kafka (Kafka) is an open-source platform that enables customers to capture streaming data like click stream events, transactions, IoT events, application and machine logs, and have applications that perform real-time analytics, run continuous transformations, and distribute this data to data lakes and databases in real time. You can use Kafka as a streaming data store to decouple applications producing streaming data (producers) from those consuming streaming data (consumers).

While Kafka is a popular enterprise data streaming and messaging framework, it can be difficult to setup, scale, and manage in production. Amazon MSK takes care of these managing tasks and makes it easy to set up, configure, and run Kafka, along with Apache ZooKeeper, in an environment following best practices for high availability and security.

Your MSK clusters always run within an Amazon VPC managed by the MSK service. Your MSK resources are made available to your own VPC, subnet, and security group through elastic network interfaces (ENIs) which will appear in your account, as described in the following architectural diagram:

Customers can create a cluster in minutes, use AWS Identity and Access Management (IAM) to control cluster actions, authorize clients using TLS private certificate authorities fully managed by AWS Certificate Manager (ACM), encrypt data in-transit using TLS, and encrypt data at rest using AWS Key Management Service (KMS) encryption keys.

Amazon MSK continuously monitors server health and automatically replaces servers when they fail, automates server patching, and operates highly available ZooKeeper nodes as a part of the service at no additional cost. Key Kafka performance metrics are published in the console and in Amazon CloudWatch. Amazon MSK is fully compatible with Kafka versions 1.1.1 and 2.1.0, so that you can continue to run your applications, use Kafka’s admin tools, and and use Kafka compatible tools and frameworks without having to change your code.

Based on our customer feedback during the open preview, Amazon MSK added may features such as:

  • Encryption in-transit via TLS between clients and brokers, and between brokers
  • Mutual TLS authentication using ACM private certificate authorities
  • Support for Kafka version 2.1.0
  • 99.9% availability SLA
  • HIPAA eligible
  • Cluster-wide storage scale up
  • Integration with AWS CloudTrail for MSK API logging
  • Cluster tagging and tag-based IAM policy application
  • Defining custom, cluster-wide configurations for topics and brokers

AWS CloudFormation support is coming in the next few weeks.

Creating a cluster

Let’s create a cluster using the AWS management console. I give the cluster a name, select the VPC I want to use the cluster from, and the Kafka version.

I then choose the Availability Zones (AZs) and the corresponding subnets to use in the VPC. In the next step, I select how many Kafka brokers to deploy in each AZ. More brokers allow you to scale the throughtput of a cluster by allocating partitions to different brokers.

I can add tags to search and filter my resources, apply IAM policies to the Amazon MSK API, and track my costs. For storage, I leave the default storage volume size per broker.

I select to use encryption within the cluster and to allow both TLS and plaintext traffic between clients and brokers. For data at rest, I use the AWS-managed customer master key (CMK), but you can select a CMK in your account, using KMS, to have further control. You can use private TLS certificates to authenticate the identity of clients that connect to your cluster. This feature is using Private Certificate Authorities (CA) from ACM. For now, I leave this option unchecked.

In the advanced setting, I leave the default values. For example, I could have chosen here a different instance type for my brokers. Some of these settings can be updated using the AWS CLI.

I create the cluster and monitor the status from the cluster summary, including the Amazon Resource Name (ARN) that I can use when interacting via CLI or SDKs.

When the status is active, the client information section provides specific details to connect to the cluster, such as:

  • The bootstrap servers I can use with Kafka tools to connect to the cluster.
  • The Zookeper connect list of hosts and ports.

I can get similar information using the AWS CLI:

  • aws kafka list-clusters to see the ARNs of your clusters in a specific region
  • aws kafka get-bootstrap-brokers --cluster-arn <ClusterArn> to get the Kafka bootstrap servers
  • aws kafka describe-cluster --cluster-arn <ClusterArn> to see more details on the cluster, including the Zookeeper connect string

Quick demo of using Kafka

To start using Kafka, I create two EC2 instances in the same VPC, one will be a producer and one a consumer. To set them up as client machines, I download and extract the Kafka tools from the Apache website or any mirror. Kafka requires Java 8 to run, so I install Amazon Corretto 8.

On the producer instance, in the Kafka directory, I create a topic to send data from the producer to the consumer:

bin/kafka-topics.sh --create --zookeeper <ZookeeperConnectString> \
--replication-factor 3 --partitions 1 --topic MyTopic

Then I start a console-based producer:

bin/kafka-console-producer.sh --broker-list <BootstrapBrokerString> \
--topic MyTopic

On the consumer instance, in the Kafka directory, I start a console-based consumer:

bin/kafka-console-consumer.sh --bootstrap-server <BootstrapBrokerString> \
--topic MyTopic --from-beginning

Here’s a recording of a quick demo where I create the topic and then send messages from a producer (top terminal) to a consumer of that topic (bottom terminal):

Pricing and availability

Pricing is per Kafka broker-hour and per provisioned storage-hour. There is no cost for the Zookeeper nodes used by your clusters. AWS data transfer rates apply for data transfer in and out of MSK. You will not be charged for data transfer within the cluster in a region, including data transfer between brokers and data transfer between brokers and ZooKeeper nodes.

You can migrate your existing Kafka cluster to MSK using tools like MirrorMaker (that comes with open source Kafka) to replicate data from your clusters into a MSK cluster.

Upstream compatibility is a core tenet of Amazon MSK. Our code changes to the Kafka platform are released back to open source.

Amazon MSK is available in US East (N. Virginia), US East (Ohio), US West (Oregon), Asia Pacific (Tokyo), Asia Pacific (Singapore), Asia Pacific (Sydney), EU (Frankfurt), EU (Ireland), EU (Paris), and EU (London).

I look forward to see how are you going to use Amazon MSK to simplify building and migrating streaming applications to the cloud!

Tourists on GrabChat!

Post Syndicated from Grab Tech original https://engineering.grab.com/tourist-chat-data-story

Just over two years ago we introduced GrabChat, Southeast Asia’s first of its kind in-app messaging platform. Since then we’ve added all sorts of useful features to it. Auto-translated messages, the ability to send photos, and even voice messages! It’s been a great tool to facilitate smoother communications between our driver-partners and our passengers, and one group in particular has found it incredibly useful: tourists!

Now, we’ve analysed tourist data before, but we were curious about how GrabChat in particular has served this demographic. So we looked for interesting insights using sampled tourist chat data from Singapore, Malaysia, and Indonesia for the period of December 2018 to March 2019. That’s more than 3.7 million individual GrabChat messages sent by tourists! Here’s what we found.

Average chats per booking per country

Looking at the volume of the chats being transmitted per booking, we can see that the “chattiest” tourists are from East Timor, Nigeria, and Ukraine with averages of 6.0, 5.6, and 5.1 chats per booking respectively.

Then we wondered: if tourists from all over the world are talking this much to our driver-partners, how are they actually communicating if their mother-tongue is not the local language?

Need a Translator?

When we go to another country, we eat all the heavenly good food, fall in love with the culture, and admire the scenery. Language and communication barriers shouldn’t get in the way of all of that. That’s why Grab’s Chat feature has got it covered!

With Grab’s in-house translation solutions, any Grab passenger can send messages in their preferred language without fear of being misunderstood – or not understood at all! Their messages will be automatically translated into Bahasa Indonesia, Bahasa Melayu, Simplified Chinese, Thai, or Vietnamese depending on where they are. This applies not only apply to Grab’s transport services- GrabChat can be used when ordering GrabFood too!

Percentage of translated GrabChat messages
Indonesia saw the highest usage of translations on a by-booking basis!


Let’s look deeper into the tourist translation statistics for each country with the donut charts below. We can see that the most popular translation route for tourists in Indonesia was from English to Indonesian. The story is different for Singapore and Malaysia: we can see that there are translations to and from a more diverse set of languages, reflecting a more multicultural demographic.

Percentage of translated GrabChat messages
The most popular translation routes for tourist bookings in Indonesia, Malaysia, and Singapore.


Tap for Templates!

GrabChat also provides achat template feature. Templates are prewritten messages that you can send with just one tap! Did we mention that they are translated automatically too? Passengers and drivers can have a fast, simple, and translated conversation with each other without typing a single word- and sometimes, templates are really all you need.

Examples of chat templates, as they appear in GrabChat!
Examples of chat templates, as they appear in GrabChat!


As if all this wasn’t convenient enough, you can also make your own custom templates! Use them for those repetitive, identical messages you always seem to be sending out like telling your drivers where the hotel lobby is, or how to navigate right to your doorstep, or even to send a quick description of what you look like to make it easier for a driver to find you!

Template message usage

Taking a look at individual country data, tourists in Indonesia used templates the most with almost 60% of all of them using a template in their conversations at least once. Malaysia and Singapore saw lower but still sizeable utilisation rates of this feature, at 53% and 33% respectively.

Template message usage percentage
Indonesia saw the highest usage of templates on a by-booking basis.


In our analysis, we found an interesting insight! There was a positive correlation between template usage and the success rate of rides. Overall, bookings that used templates in their conversations saw 10% more completions over bookings that didn’t.

Template vs completed bookings

Picture this: a hassle-free experience

A picture says a thousand words, and for tourists using GrabChat’s image feature, those thousand words don’t even need to be translated. Instead of typing out a description of where they are standing for pickup, they can just click, snap, and send an image!

Our data revealed that GrabChat’s image functionality is most frequently used in areas where the tourist traffic is the highest. In fact, image function in GrabChat saw the most use in pickup areas such as airports, large shopping malls, public transport stations, and hotels, because it was harder for drivers to find their passengers in these crowded areas. Even with our super convenient Entrances feature, every little bit of information goes a long way to help your driver find you!

Pickup locations

If we take it a step further and look at the actual areas  within the cities where images were sent the most, we see that our initial hypothesis still holds fast.

Pickup locations
The top 5 pickup areas per country in which images were the most prevalent in GrabChat (for tourists).


In Singapore, we see the most images being sent out at the Downtown Core area- this area contains the majestic Marina Bay Sands, the Merlion statue, and the Esplanade, amongst other iconic attractions.

In Malaysia, the highest image usage occurs at none other than the Kuala Lumpur City Centre (KLCC) itself. This area includes the Twin Towers, a plethora of malls and hotels, Bukit Bintang (a bustling and lively night-life zone), and even an aquarium.

Indonesia’s top location for image chats is Kuta. A beach village in Bali, Kuta is a tourist hotspot with surfing, water parks, bars, budget-friendly yet delicious food, and numerous cultural attractions.

Speak up!

Allowing for two-way communication via GrabChat empowers both passengers and drivers to improve their journeys by divulging useful information, and asking clarifying questions: how many bags do you have? Does your car accommodate my pet dog? I’m standing by the lobby with my two kids- these are the sorts of things that are talked about in GrabChat messages.

During the analysis of our multitudes of wide-ranging GrabChat conversations, we picked up some pro-tips for you to get a Grab ride with even more convenience and ease, whether you’re a tourist or not:

Tip #1: Did some shopping on your trip? Swamped with bags? Send a message to your driver to let them know how many pieces of luggage you have with you.

As one might expect, chats that have keywords such as “luggage” or “baggage” (or any other related term) occur the most when riders are going to, or leaving, an airport. Most of the tourists on GrabChat asked the drivers if there was space for all of their things in the car. Interestingly, some of them also told the drivers how to recognise them for pickup based off of the descriptions of their bags!

Tip #2: Your children make good landmarks! If you’re in a crowded spot and you’re worried your driver can’t find you, drop them a message to let them know you’re that family with a baby and a little girl in pigtails.

When it comes to children, we found that passengers mainly use them to help identify themselves to the driver. Messages like “I’m with my two kids” or “We are a family with a baby” came up numerous times, and served as descriptions to facilitate fast pickup. These sorts of chats were the most prevalent in crowded areas like airports and shopping centres.

Tip #3: Don’t get caught off guard- be sure your furry friends have a seat!

Taking a look at pet related chats, we learned that our tourists have used GrabChat to ask clarifying questions to the driver. Passengers have likely considered that not every driver or vehicle is accommodating towards animals. The most common type of message was about whether pets are allowed in the vehicle. For example: “Is it okay if I bring a puppy?” or “I have a dog with me in a carrier, is that alright?”. Better safe than sorry! Alternatively, if you’re travelling with a pet, why not see if GrabPet is available in your country?

From the chat content analysis we have learned that tourists do indeed use GrabChat to talk to their drivers about specific details of their trip. We see that the chat feature is an invaluable tool that anyone can use to clear up any ambiguities and make their journeys more pleasant.

Bubble Tea Craze on GrabFood!

Post Syndicated from Grab Tech original https://engineering.grab.com/bubble-tea-craze-on-grabfood

Bigger and More Bubble Tea!

Bubble tea orders on GrabFood has been constantly and dramatically increasing with an impressive regional average growth rate of 3,000% in the year of 2018!  Just look at the percentage increase over the year of 2018, across all countries!

CountriesBubble tea growth by percentage in 2018*
Indonesia>8500% growth from Jan 2018 to Dec 2018
Philippines>3,500% growth from June 2018 to Dec 2018
Thailand>3,000% growth from Jan 21018 to Dec 2018
Vietnam>1,500% growth from May 2018 to Dec 2018
Singapore>700% growth from May 2018 to Dec 2018
Malaysia>250% growth from May 2018 to Dec 2018

*Time period: January 2018 to December 2018, or from the time GrabFood was launched.

What’s driving this growth is not just die-hard bubble tea fans who can’t go a week without drinking this sweet treat, but a growing bubble tea fan club in Southeast Asia. The number of bubble tea lovers on GrabFood grew over 12,000% in 2018 – and there’s no sign of stopping!

With increasing consumer demand, how is Southeast Asia’s bubble tea supply catching up?  As of December 2018, GrabFood has close to 4,000 bubble tea outlets from a network of over 1,500 brands – a 200% growth in bubble tea outlets in Southeast Asia!

Bubble-Tea-Lover growth on GrabFood

If this stat doesn’t stick, here is a map to show you how much bubble tea orders in different Southeast Asian cities have grown!

Maps of bubble tea merchants on GrabFood

And here is a little shoutout to our star merchants including Chatime, Coco Fresh Tea & Juice, Macao Imperial Tea, Ochaya, Koi Tea, Cafe Amazon, The Alley, iTEA, Gong Cha, and Serenitea.

Just how much do you drink?

On average, Southeast Asians drink  4 cups of bubble tea per person per month on GrabFood. Thai consumers top the regional average by 2 cups, consuming about six cups of bubble tea per person per month. This is closely followed by Filipino consumers who drink an average of 5 cups per person per month.

Average bubble tea consumption by cups per person per month

Favourite Flavours!

Have a look at the dazzling array of Bubble Tea flavours available on GrabFood today and you’ll find some uniquely Southeast Asian flavours like Chendol, Durian, and Gula Melaka, as well as rare flavours like salted cream and cheese! Can you spot your favourite flavours here?

Bubble tea flavour consumption per month

Let’s break it down by the country that GrabFood serves, and see who likes which flavours of Bubble Tea more!

Bubble tea flavour consumption per month by country

Top the Toppings!

Pearl seems to be the unbeatable best topping of most of the countries, except Vietnam whose No. 1 topping turned out to be Cheese Pudding! Top 3 toppings that topped your favorite bubble tea are:

Top list of toppings

Best Time for Bubble Tea!

Don’t we all need a cup of sweet Bubble Tea in the afternoon to get us through the day?  Across Southeast Asia, GrabFood’s data reveals that most people order bubble tea to accompany their meals at lunch, or as a  perfect midday energizer!

Times of the day when most people order bubble tea


So hazelnut or chocolate, pearl or (and) pudding (who says we can’t have the best of both worlds!)? The options are abundant and the choice is yours to enjoy!

If you have a sweet tooth, or simply want to reward yourself with Southeast Asia’s most popular drink, go ahead – you are only a couple of taps away from savouring this cup full of delight

Amazon Kinesis Data Firehose custom prefixes for Amazon S3 objects

Post Syndicated from Rajeev Chakrabarti original https://aws.amazon.com/blogs/big-data/amazon-kinesis-data-firehose-custom-prefixes-for-amazon-s3-objects/

In February 2019, Amazon Web Services (AWS) announced a new feature in Amazon Kinesis Data Firehose called Custom Prefixes for Amazon S3 Objects. It lets customers specify a custom expression for the Amazon S3 prefix where data records are delivered. Previously, Kinesis Data Firehose allowed only specifying a literal prefix. This prefix was then combined with a static date-formatted prefix to create the output folder in a fixed format. Customers asked for flexibility, so AWS listened and delivered.

Kinesis Data Firehose is most commonly used to consume event data from streaming sources, such as applications or IoT devices.  The data then is typically stored in a data lake, so it can be processed and eventually queried.  When storing data on Amazon S3, it is a best practice to partition or group related data and store it together in the same folder.  This provides the ability to filter the partitioned data and control the amount of data scanned by each query, thus improving performance and reducing cost.

A common way to group data is by date.  Kinesis Data Firehose automatically groups data and stores it into the appropriate folders on Amazon S3 based on the date.  However, the naming of folders in Amazon S3 is not compatible with Apache Hive naming conventions. This makes data more difficult to catalog using AWS Glue crawlers and analyze using big data tools.

This post discusses a new capability that lets us customize how Kinesis Data Firehose names the output folders in Amazon S3. It covers how custom prefixes work, the intended use cases, and includes step-by-step instructions to try the feature in your own account.

The need for custom prefixes for Amazon S3 objects

Previously, Kinesis Data Firehose created a static Universal Coordinated Time (UTC) based folder structure in the format YYYY/MM/DD/HH. It then appended it to the provided prefix before writing objects to Amazon S3. For example, if you provided a prefix “mydatalake/”, the generated folder hierarchy would be “mydatalake/2019/02/09/13”.  However, to be compatible with Hive naming conventions, the folder structure is expected to follow the format “/partitionkey=partitionvalue”.  Using this naming convention, data can be easily cataloged with AWS Glue crawlers, resulting in proper partition names.

Other methods for managing partitions also become possible such as running MSCK REPAIR TABLE in Amazon Athena or Apache Hive on Amazon EMR, which can add all partitions through a single statement. Furthermore, you can use other date-based partitioning patterns like “/dt=2019-02-09-13/” instead of expanding the date out into folders.  This is helpful in reducing the total number of partitions that need to be maintained as the table grows over time. It also simplifies range queries. Providing the ability to specify custom prefixes obviates the need for an additional ETL step to put the data in the right folder structure improving the time to insight.

How custom prefixes for Amazon S3 objects works

This new capability does not let you use any date or timestamp value from your event data, nor can you use any other arbitrary value in the event. Kinesis Data Firehose uses an internal timestamp field called ApproximateArrivalTimestamp. Each data record includes an ApproximateArrivalTimestamp (in UTC) that is set when a stream successfully receives and stores the record. This is commonly referred to as a server-side timestamp. Kinesis Data Firehose buffers incoming records according to the configured buffering hints and delivers them into Amazon S3 objects for the Amazon S3 destination. The resulting objects in Amazon S3 may contain multiple records, each with a different ApproximateArrivalTimestamp. When evaluating timestamps, Kinesis Data Firehose uses the ApproximateArrivalTimestamp of the oldest record that’s contained in the Amazon S3 object being written.

Kinesis Data Firehose also provides the ability to deliver records to a different error output location when there is a delivery, AWS Lambda transformation or format conversion failure. Previously, the error output location could not be configured and was determined by the type of delivery failure. With this release, the error output location (ErrorOutputPrefix) can also be configured. One benefit of this new capability is that you can separate failed records into date partitioned folders for easy reprocessing.

So how do you specify the custom Prefix and the ErrorOutputPrefix? You use an expression of the form: !{namespace:value}, where the namespace can be either firehose or timestamp. The value can be either “random-string” or “error-output-type” for the firehose namespace or a date pattern for the timestamp namespace in the Java DateTimeFormatter format. In a single expression, you can use a combination of the two namespaces although the !{firehose: error-output-type} can be used only in the ErrorOutputPrefix. For more information and examples, see Custom Prefixes for Amazon S3 Objects.

Writing streaming data into Amazon S3 with Kinesis Data Firehose

This walkthrough describes how streaming data can be written into Amazon S3 with Kinesis Data Firehose using a Hive compatible folder structure.  It then shows how AWS Glue crawlers can infer the schema and extract the proper partition names that we designated in Kinesis Data Firehose, and catalog them in AWS Glue Data Catalog.  Finally, we run sample queries to show that partitions are indeed being recognized.

To demonstrate this, we use python code to generate sample data.  We also use a Lambda transform on Kinesis Data Firehose to forcibly create failures. This demonstrates how data can be saved to the error output location. The code that you need for this walkthrough is included here in GitHub.

For this walkthrough, this is the architecture that we are building:

Step 1: Create an Amazon S3 bucket

Create an S3 bucket to be used by Kinesis Data Firehose to deliver event records. We use the AWS Command Line Interface (AWS CLI) to create the Amazon S3 bucket in the US East (N. Virginia) Region. Remember to substitute the bucket name in the example for your own.

aws s3 mb s3://kdfs3customprefixesexample --region us-east-1

Step 2: Lambda Transform (optional)

The incoming events have an ApproximateArrivalTimestamp field in the event payload.  This is sufficient to create a proper folder structure on Amazon S3.  However, when querying the data it may be beneficial to expose this timestamp value as a top level column for easy filtering and validation.  To accomplish this, we create a Lambda function that adds the ApproximateArrivalTimestamp as a top level field in the data payload. The data payload is what Kinesis Data Firehose writes as an object in Amazon S3. Additionally, the Lambda code also artificially generates some processing errors that are delivered to the “ErrorOutputPrefix” location specified for the delivery destination to illustrate the use of expressions in the “ErrorOutputPrefix.”

Create an IAM role for the Lambda transform function

First, create a role for the Lambda function called LambdaBasicRole. The TrustPolicyForLambda.json file is included in the GitHub repository.

$ aws iam create-role --role-name KDFLambdaBasicRole --assume-role-policy-document file://TrustPolicyForLambda.json

After the role is created, attach the managed Lambda basic execution policy to it.

$ aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole --role-name KDFLambdaBasicRole

Lambda function

To create the Lambda function, start with the Python Kinesis Data Firehose blueprint “General Firehose Processing” and then modify it. For more information about the structure of the records and what must be returned, see Amazon Kinesis Data Firehose Data Transformation.

Zip up the Python file, and then create the Lambda function using the AWS CLI. The CreateLambdaFunctionS3CustomPrefixes.json file is included in the GitHub repository.

aws lambda create-function --zip-file "fileb://lambda_function.zip" --cli-input-json file://CreateLambdaFunctionS3CustomPrefixes.json

Step3. Delivery Stream

Next, create the Kinesis Data Firehose delivery stream. The createdeliverystream.json file is included in the GitHub repository.

 aws firehose create-delivery-stream --cli-input-json file://createdeliverystream.json

In the previous configuration, we defined a Prefix and an ErrorOutputPrefix under the “ExtendedS3DestinationConfiguration” element. We defined the same for the “S3BackupConfiguration” element. Note that when the “ProcessingConfiguration” element is set to “Disabled”, the ErrorOutputPrefix parameter of the “ExtendedS3DestinationConfiguration” element exists only for consistency. It otherwise has no significance.

We’ve chosen a prefix that will result in a folder structure compatible with hive-style partitioning. This is the prefix we used:


Kinesis Data Firehose first creates a base folder called “fhbase” directly under the Amazon S3 bucket. Second, it evaluates the expressions !{timestamp:YYYY}, !{timestamp:MM}, !{timestamp:dd}, and !{timestamp:HH} to year, month, day and hour using the Java DateTimeFormatter format. For example, an ApproximateArrivalTimestamp of 1549754078390 in UNIX epoch time, which is 2019-02-09T16:13:01.000000Z in UTC would evaluate to “year=2019”, “month=02”, “day=09” and “hour=16”.  Therefore, the location in Amazon S3 where data records that are delivered evaluate to “fhbase/year=2019/month=02/day=09/hour=16/”.

Similarly, the ErrorOutputPrefix “fherroroutputbase/!{firehose:random-string}/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}/” results in a base folder called “fherroroutputbase” directly under the S3 bucket. The expression !{firehose:random-string} evaluates to an 11 character random string like “ztWxkdg3Thg”.  If you use this more than once in the same expression, every instance evaluates to a new random string. The expression !{firehose:error-output-type} evaluates to one of the following:

  1. “processing-failed” for Lambda transformation delivery failures
  2. “elasticsearch-failed” for an Amazon ES destination delivery failures
  3. “splunk-failed” for Splunk destination delivery failures
  4. “format-conversion-failed” for data format conversion failures

So, the location for an Amazon S3 object containing the delivery failed records for a Lambda transformation could evaluate to: fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/09/.

You can run aws firehose describe-delivery-stream --delivery-stream-name KDFS3customPrefixesExample to describe the delivery stream created.

Next, enable encryption-at-rest for the delivery stream:

aws firehose start-delivery-stream-encryption --delivery-stream-name KDFS3customPrefixesExample

Or Create the delivery stream using the AWS Console

  1. Choose the source. For this example, I use Direct PUT.
  2. Choose if you would like to transform the incoming records with a Lambda transformation. I chose Enabled, and chose the name of the Lambda function that I had created earlier.

  1. Choose the destination. I chose the Amazon S3 destination.

  1. Choose the Amazon S3 bucket. I chose the Amazon S3 bucket that I had created earlier in this exercise.

  1. Specify the Amazon S3 Prefix and the Amazon S3 error prefix. This corresponds to the “Prefix” and “ErrorOutputPrefix” explained earlier in the context of the AWS CLI input JSON.

  1. Choose whether you would like to back up the raw (before transformation) records to another Amazon S3 location. I chose Enabled and specified the same bucket (you could choose a different bucket). I also specified a different prefix from the transformed records – the base folder is different but the folder structure below that is the same. This would make it more efficient to crawl this location using an AWS Glue crawler or create external tables in Athena or Redshift Spectrum pointing to this location.

  1. Specify the buffering hints for the Amazon S3 destination. I chose 1 MB and 240 seconds.
  2. Choose the S3 Compression and encryption settings. I chose no compression for the transformed records’ location. I chose to encrypt the Amazon S3 location at rest by using the service-managed AWS KMS customer master key (CMK).
  3. Choose whether you want to enable Error Logging in Cloudwatch. I chose Enabled.
  4. Specify the IAM role that you want Kinesis Data Firehose to assume to access resources on your behalf. Choose either Create new or Choose to display a new screen. Choose Create a new IAM role, name the role, and then choose Allow.
  5. Choose Create Delivery Stream.

The delivery stream is now created and active. You can send events to it.

 Test with sample data

I used Python code to generate sample data. The structure of the generated data is as follows:

{'sector': 'HEALTHCARE', 'price': 194.07, 'ticker_symbol': 'UFG', u'EventTime': '2019-02-12T07:10:52.649000Z', 'change': 20.56}
{'sector': 'HEALTHCARE', 'price': 124.01, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:53.745000Z', 'change': 3.32}
{'sector': 'MANUFACTURING', 'price': 26.95, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:54.864000Z', 'change': 24.53}

Sample code to generate data and push it into Kinesis Data Firehose is included in the GitHub repository.

After you start sending events to the Kinesis Data Firehose delivery stream, objects should start appearing under the specified prefixes in Amazon S3.

I wanted to illustrate Lambda invoke errors and the appearance of files in the ErrorOutputPrefix location for Lambda transform errors. Therefore, I did not give permissions to the “firehose_delivery_role” to invoke my Lambda function. The following file showed up in the location specified by the ErrorOutputPrefix.

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/FxvO2Tf9MQP/processing-failed/2019/02/12/

2019-02-12 16:57:24     260166 KDFS3customPrefixesExample-1-2019-02-12-16-53-20-5262db81-0f3a-48bf-8fc6-2249124923ff

Here is a snippet of the contents of the error file that I previously mentioned.

{"attemptsMade":4,"arrivalTimestamp":1549990400391,"errorCode":"Lambda.InvokeAccessDenied","errorMessage":"Access was denied. Ensure that the access policy allows access to the Lambda function.","attemptEndingTimestamp":1549990478018,"rawData":"eyJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE4Ny45NCwgInRpY2tlcl9zeW1ib2wiOiAiVUZHIiwgIkV2ZW50VGltZSI6ICIyMDE5LTAyLTEyVDE2OjUzOjE5Ljk5MzAwMFoiLCAiY2hhbmdlIjogOS4yNn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

After I gave the “firehose_delivery_role” the appropriate permissions, the data objects showed up in the “Prefix” location specified for the Amazon S3 destination.

aws s3 ls s3://kdfs3customprefixesexample/fhbase/year=2019/month=02/day=12/hour=17/

2019-02-12 17:17:26    1392167 KDFS3customPrefixesExample-1-2019-02-12-17-14-51-fc63e8f6-7421-491d-8417-c5002fca1722

2019-02-12 17:18:39    1391946 KDFS3customPrefixesExample-1-2019-02-12-17-16-43-e080a18a-3e1e-45ad-8f1a-98c7887f5430

Also, because the Lambda code in my Lambda transform set the status failed for 10 percent of the records, those showed up in the ErrorOutputPrefix location for Lambda transform errors.

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/12/

2019-02-12 17:25:54     180092 KDFS3customPrefixesExample-1-2019-02-12-17-21-53-3bbfe7c0-f505-47d0-b880-797ce9035f73

Here is a snippet of the content of the error file:

{"attemptsMade":1,"arrivalTimestamp":1549992113419,"errorCode":"Lambda.ProcessingFailedStatus","errorMessage":"ProcessingFailed status set for record","attemptEndingTimestamp":1549992138424,"rawData":"eyJ0aWNrZXJfc3ltYm9sIjogIlFYWiIsICJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE3LjUyLCAiY2hhbmdlIjogMTcuNTUsICJFdmVudFRpbWUiOiAiMjAxOS0wMi0xMlQxNzoyMTo1My4zOTY2NDdaIn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

You’re now ready to create an AWS Glue crawler. For more information about using the AWS Glue Data Catalog, see Populating the AWS Glue Data Catalog.

  1. In the AWS Glue console, go to Crawlers, and choose Add Crawler.

  1. Add information about your crawler, then choose Next.
  2. In the Include Path, specify the Amazon S3 bucket name that you entered under the Amazon S3 destination. Also include the static prefix used when you created the Kinesis Data Firehose delivery stream. Do not include the custom prefix expression.
  3. Choose Next.

  1. Choose Next, No, Next.
  2. Specify the IAM role that AWS Glue would use. I chose to create a new IAM Role. Choose Next.
  3. Specify a schedule to run the crawler. I chose to Run it on Demand. Choose Next.
  4. Specify where the crawler adds the crawled and discovered tables. I chose the default database. Choose Next.

  1. Choose Finish.
  1. The crawler has been created and is ready to be run. Choose Run crawler.

  1. In the AWS Glue console, go to Tables. You can see that a table has been created with the name of the base folder. Choose fhbase.

The crawler has discovered and populated the table and its properties.

You can see the discovered schema. The crawler has identified and created the partitions based on the folder structure specified by the prefix expression.

Open the Amazon Athena console, and select the default database from the drop-down menu. Write the following query in the New query1 window, then choose Run query.

SELECT * FROM "default"."fhbase"

where year = '2019' and day = '12' and hour = '17'

order by approxarrtimestamputcfh desc

Notice that Amazon Athena recognizes the fhbase table as a partitioned table. The query can take advantage of the partitions in the query to filter the results.


As this post illustrates, Custom Prefixes for Amazon S3 objects provides much flexibility to customize the folder structure, where Kinesis Data Firehose delivers the data records and failure records in Amazon S3. Having control over the folder structure and naming in Amazon S3 simplifies data discovery, cataloging, and access. As a result, it helps get insight more expediently and helps you better manage the cost of your queries.


About the Author

Rajeev Chakrabarti is a Kinesis specialist solutions architect.





Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications

Post Syndicated from Steffen Hausmann original https://aws.amazon.com/blogs/big-data/build-and-run-streaming-applications-with-apache-flink-and-amazon-kinesis-data-analytics-for-java-applications/

Stream processing facilitates the collection, processing, and analysis of real-time data and enables the continuous generation of insights and quick reactions to emerging situations. This capability is useful when the value of derived insights diminishes over time. Hence, the faster you can react to a detected situation, the more valuable the reaction is going to be. Consider, for instance, a streaming application that analyzes and blocks fraudulent credit card transactions while they occur. Compare that application to a traditional batch-oriented approach that identifies fraudulent transactions at the end of every business day and generates a nice report for you to read the next morning.

It is quite common for the value of insights to diminish over time. Therefore, using stream processing can substantially improve the value of your analytics application. However, building and operating a streaming application that continuously receives and processes data is much more challenging than operating a traditional batch-oriented analytics application.

In this post, we discuss how you can use Apache Flink and Amazon Kinesis Data Analytics for Java Applications to address these challenges. We explore how to build a reliable, scalable, and highly available streaming architecture based on managed services that substantially reduce the operational overhead compared to a self-managed environment. We particularly focus on how to prepare and run Flink applications with Kinesis Data Analytics for Java Applications. To this end, we use an exemplary scenario that includes source code and AWS CloudFormation templates. You can follow along with this example using your own AWS account or adapt the code according to your specific requirements.

Challenges of running streaming applications

When you build a streaming application, the downstream systems naturally rely on a continuous and timely generation of output. Accordingly, there are much higher requirements on the availability of the streaming application. There is also much less time to address operational issues compared to a traditional batch-based approach. In a batch-processing scenario, when a job that runs once at the end of a business day fails, you can often restart the failed job and still complete the computation by the next morning, when the results are needed. In contrast, when a streaming application fails, downstream systems that consume the output might be affected within minutes, or even sooner, when the expected output no longer arrives in time.

Moreover, in case of failure, you can’t simply delete all intermediate results and restart a failed processing job, as it is commonly done in the batch-processing case. The output of a streaming job is continuously consumed by downstream systems. Output that has already been consumed cannot easily be retracted. Therefore, the entire processing pipeline is much more sensitive to duplicates that are introduced by an application that is restarted on failure. Furthermore, the computations of a streaming application often rely on some kind of internal state that can be corrupted or even lost when the application fails.

Last but not least, streaming applications often deal with a varying amount of throughput. Therefore, scaling the application according to the current load is highly desirable. When the load increases, the infrastructure that supports the streaming application must scale to keep the application from becoming overloaded, falling behind, and producing results that are no longer relevant. On the other hand, when the load decreases, the infrastructure should scale in again to remain cost effective by not provisioning more resources than are needed.

A reliable and scalable streaming architecture based on Flink and Kinesis Data Analytics for Java Applications

Apache Flink is an open-source project that is tailored to stateful computations over unbounded and bounded datasets. Flink addresses many of the challenges that are common when analyzing streaming data by supporting different APIs (including Java and SQL), rich time semantics, and state management capabilities. It can also recover from failures while maintaining exactly-once processing semantics. Therefore, Flink is well suited for analyzing streaming data with low latency.

In this post, we illustrate how to deploy, operate, and scale a Flink application with Kinesis Data Analytics for Java Applications. We use a scenario to analyze the telemetry data of a taxi fleet in New York City in near-real time to optimize the fleet operation. In this scenario, every taxi in the fleet is capturing information about completed trips. The tracked information includes the pickup and drop-off locations, number of passengers, and generated revenue. This information is ingested into a Kinesis data stream as a simple JSON blob. From there, the data is processed by a Flink application, which is deployed to Kinesis Data Analytics for Java Applications. This application identifies areas that are currently requesting a high number of taxi rides. The derived insights are finally persisted into Amazon Elasticsearch Service, where they can be accessed and visualized using Kibana.

This scenario leads to the following architecture, which is separated into three stages for the ingestion, processing, and presentation of data.

Separating the different aspects of the infrastructure is a common approach in this domain and has several benefits over a more tightly coupled architecture.

First, the Kinesis data stream serves as a buffer that decouples the producers from the consumers. Taxis can persist the events that they generate into the data stream regardless of the condition of, for instance, the processing layer, which might be currently recovering from a node failure. Likewise, the derived data remains available through Kibana even if the ingestion or processing layer is currently unavailable due to some operational issues. Last but not least, all components can be scaled independently and can use infrastructure that is specifically tailored according to their individual requirements.

This architecture also allows you to experiment and adopt new technologies in the future. Multiple independent applications can concurrently consume the data stored in the Kinesis data stream. You can then test how a new version of an existing application performs with a copy of the production traffic. But you can also introduce a different tool and technology stack to analyze the data, again without affecting the existing production application. For example, it is common to persist the raw event data to Amazon S3 by adding a Kinesis Data Firehose delivery stream as a second consumer to the Kinesis data stream. This facilitates long-term archiving of the data, which you can then use to evaluate ad hoc queries or analyze historic trends.

All in all, separating the different aspects of the architecture into ingestion, processing, and presentation nicely decouples different components, making the architecture more robust. It furthermore allows you to choose different tools for different purposes and gives you a lot of flexibility to change or evolve the architecture over time.

For the rest of this post, we focus on using Apache Flink and Kinesis Data Analytics for Java Applications to identify areas that currently request a high number of taxi rides. We also derive the average trip duration to the New York City airports. But with this architecture, you also have the option to consume the incoming events using other tools, such as Apache Spark Structured Streaming and Kinesis Data Firehose, instead of, or in addition to, what is described here.

Let’s kick the tires!

To see the described architecture in action, execute the following AWS CloudFormation template in your own AWS account. The template first builds the Flink application that analyzes the incoming taxi trips, including the Flink Kinesis Connector that is required to read data from a Kinesis data stream. It then creates the infrastructure and submits the Flink application to Kinesis Data Analytics for Java Applications.

The entire process of building the application and creating the infrastructure takes about 20 minutes. After the AWS CloudFormation stack is created, the Flink application has been deployed as a Kinesis Data Analytics for Java application. It then waits for events in the data stream to arrive. Checkpointing is enabled so that the application can seamlessly recover from failures of the underlying infrastructure while Kinesis Data Analytics for Java Applications manages the checkpoints on your behalf. In addition, automatic scaling is configured so that Kinesis Data Analytics for Java Applications automatically allocates or removes resources and scales the application (that is, it adapts its parallelism) in response to changes in the incoming traffic.

To populate the Kinesis data stream, we use a Java application that replays a public dataset of historic taxi trips made in New York City into the data stream. The Java application has already been downloaded to an Amazon EC2 instance that was provisioned by AWS CloudFormation. You just need to connect to the instance and execute the JAR file to start ingesting events into the stream.

You can obtain all of the following commands, including their correct parameters, from the output section of the AWS CloudFormation template that you executed previously.

$ ssh [email protected]«Replay instance DNS name»

$ java -jar amazon-kinesis-replay-1.0.jar -stream «Kinesis data stream name» -region «AWS region» -speedup 3600

The speedup parameter determines how much faster the data is ingested into the Kinesis data stream relative to the actual occurrence of the historic events. With the given parameters, the Java application ingests an hour of historic data within one second. This results in a throughput of roughly 13k events and 6 MB of data per second, which completely saturates the Kinesis data stream (more on this later).

You can then go ahead and inspect the derived data through the Kibana dashboard that has been created. Or you can create your own visualizations to explore the data in Kibana.

https://«Elasticsearch endpoint»/_plugin/kibana/app/kibana#/dashboard/nyc-tlc-dashboard

The prepared Kibana dashboard contains a heatmap and a line graph. The heatmap visualizes locations where taxis are currently requested, and it shows that the highest demand for taxis is Manhattan. Moreover, the JFK and LaGuardia airports are also spots on the map where substantially more rides are requested compared to their direct neighborhoods. The line graph visualizes the average trip duration to these two airports. The following image shows how it steadily increases throughout the day until it abruptly drops in the evening.

For this post, the Elasticsearch cluster is configured to accept connections from the IP address range specified as a parameter of the AWS CloudFormation template. For production workloads, it’s much more desirable to further tighten the security of your Elasticsearch domain, for instance, by using Amazon Cognito for Kibana access control.

Scaling the architecture to increase its throughput

For this post, the Kinesis data stream was deliberately underprovisioned so that the Java application is completely saturating the data stream. When you closely inspect the output of the Java application, you’ll notice that the “replay lag” is continuously increasing. This means that the producer cannot ingest events as quickly as it is required according to the specified speedup parameter.

You can dive deeper into the metrics of the data stream by accessing it through an Amazon CloudWatch Dashboard. You can then see that the WriteProvisionedThroughputExceeded metric is slightly increased: Roughly 0.4 percent of the records are not accepted into the stream as the respective requests are throttled. In other terms, the data stream is underprovisioned, in particular as the producer pauses the ingestion of new events when too many events are in flight.

To increase the throughput of the data stream, you can simply update the number of shards from 6 to 12 with a couple of clicks on the console and through an API call, respectively. For production environments, you might even want to automate this procedure. For details on how to automatically scale a Kinesis data stream, see the blog post Scaling Amazon Kinesis Data Streams with AWS Application Auto Scaling.

When the scaling operation of the stream finishes, you can observe how the “replay lag” decreases and more events are ingested into the stream.

However, as a direct result, more events need to be processed. So now the Kinesis Data Analytics for Java application becomes overloaded and can no longer keep up with the increased number of incoming events. You can observe this through the millisBehindLatest metric, which is published to CloudWatch. The metric reports the time difference between the oldest record currently read by the Kinesis Data Analytics for Java application and the latest record in the stream according to the ingestion time in milliseconds. So it indicates how much behind the processing is from the tip of the stream.

As these metrics show, 10 minutes after the scaling operation finishes, processing is already more than 3 minutes behind the latest event in the stream. Even worse, it steadily keeps falling behind, continuously widening this gap.

However, in contrast to Kinesis Data Streams, Kinesis Data Analytics for Java Applications natively supports auto scaling. After a couple of minutes, you can see the effect of the scaling activities in the metrics. The millisBehindLatest metric starts to decrease until it reaches zero, when the processing has caught up with the tip of the Kinesis data stream.

However, notice how the millisBehindLatest metric spikes just before it starts to decline. This is caused by the way that scaling a Kinesis Data Analytics for Java application works today. To scale a running application, the internal state of the application is persisted into a so-called savepoint. This savepoint is exposed as a snapshot by Kinesis Data Analytics for Java Applications. Subsequently, the running instance of the application is terminated, and a new instance of the same application with more resources and a higher parallelism is created. The new instance of the application then populates its internal state from the snapshot and resumes the processing from where the now terminated instance left off.

Accordingly, the scaling operation causes a brief interruption of the processing, which explains the spike in metric. However, this operation is transparent to the producers and consumers. Producers can continue to write to the Kinesis data stream because they are nicely decoupled from the application. Likewise, consumers can still use Kibana to view their dashboards, although they might not see the latest data because it hasn’t yet been processed.

Let’s step back for a moment and review what you just did: You created a fully managed, highly available, scalable streaming architecture. You ingested and analyzed up to 25k events per second. You doubled the throughput of the architecture by scaling the Kinesis data stream and the Kinesis Data Analytics for Java application with a couple of clicks. You did all this while the architecture remained fully functional and kept receiving and processing events, not losing a single event. You also could have scaled the Elasticsearch cluster as seamlessly as the other components. But we’ll leave that as an exercise for the interested reader.

Try to imagine what it would have taken you to build something similar from scratch.

Prepare Flink applications for Kinesis Data Analytics for Java Applications

Now that you have seen the streaming application in action, let’s look at what is required to deploy and run a Flink application with Kinesis Data Analytics for Java Applications.

Similar to other deployment methods, the Flink application is first built and packaged into a fat JAR, which contains all the necessary dependencies for the application to run. The resulting fat JAR is then uploaded to Amazon S3. The location of the fat JAR on S3 and some additional configuration parameters are then used to create an application that can be executed by Kinesis Data Analytics for Java Applications. So instead of logging in to a cluster and directly submitting a job to the Flink runtime, you upload the respective fat JAR to S3. You then create a Kinesis Data Analytics for Java application that you can interact with using API calls, the console, and the AWS CLI, respectively.

Adapt the Flink configuration and runtime parameters

To obtain a valid Kinesis Data Analytics for Java application, the fat JAR of the Flink application must include certain dependencies. When you use Apache Maven to build your Flink application, you can simply add another dependency to the .pom file of your project.

<!—pom.xml ->

You can then specify parameters that are passed to the resulting Kinesis Data Analytics for Java application when it is created or updated. These parameters are basically key-value pairs that are contained in a property map that is part of a property group.

"ApplicationConfiguration": {
    "EnvironmentProperties": {
        "PropertyGroups": [
                "PropertyGroupId": "FlinkApplicationProperties",
                "PropertyMap": {
                    "InputStreamName": "...",

You can then obtain the values of these parameters in the application code from the Kinesis Data Analytics for Java Applications runtime. For example, the following code snippet gets the name of the Kinesis data stream that the application should connect to from the FlinkApplicationProperties property group.

Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();

Properties flinkProperties = applicationProperties.get("FlinkApplicationProperties");

String kinesisStreamName = flinkProperties.getProperty("InputStreamName");

You use the same mechanism to configure other properties for the Kinesis Data Analytics for Java application (for example, checkpointing and the parallelism of the application) that are usually specified as a parameter or configuration option directly to the Flink runtime.

"ApplicationConfiguration": {
    "FlinkApplicationConfiguration": {
        "CheckpointConfiguration": {
            "ConfigurationType": "DEFAULT"
        "MonitoringConfiguration": {
            "ConfigurationType": "CUSTOM",
            "MetricsLevel": "TASK",
            "LogLevel": "INFO"
        "ParallelismConfiguration": {
            "ConfigurationType": "DEFAULT"

With this configuration, the checkpointing and parallelism settings are left at their default. This enables checkpointing and auto scaling and sets the initial parallelism of the Kinesis Data Analytics for Java application to one. Moreover, the log level is increased to INFO and CloudWatch metrics are collected for every subtask of the application.

Build the Flink Kinesis Connector

When you are building a Flink application that reads data from a Kinesis data stream, you might notice that the Flink Kinesis Connector is not available from Maven central. You actually need to build it yourself. The following steps build the connector for any recent Apache Flink release. However, because Kinesis Data Analytics for Java Applications is based on Flink 1.6.2, you can use this specific version for now.

$ wget -qO- https://github.com/apache/flink/archive/release-1.6.2.zip | bsdtar -xf-

$ cd flink-release-1.6.2

$ mvn clean package -B -DskipTests -Dfast -Pinclude-kinesis -pl flink-connectors/flink-connector-kinesis

Note that the connector has already been built and stored on S3 by the AWS CloudFormation template. You can simply download the JAR file of the connector from there and put it in your local Maven repository using the following Maven command:

$ mvn install:install-file -Dfile=flink-connector-kinesis_2.11-1.6.2.jar -DpomFile flink-connector-kinesis_2.11-1.6.2.pom.xml

Integrate the Flink Elasticsearch sink with Amazon Elasticsearch Service

Beginning with the 1.6 release, Apache Flink comes with an Elasticsearch connector that supports the Elasticsearch APIs over HTTP. Therefore, it can natively talk to the endpoints that are provided by Amazon Elasticsearch Service.

You just need to decide how to authenticate requests against the public endpoint of the Elasticsearch cluster. You can whitelist individual IPs for access to the cluster. However, the recommended way of authenticating against the Amazon Elasticsearch Service endpoint is to add authentication information to AWS requests using IAM credentials and the Signature Version 4 signing process.

To extend the Flink Elasticsearch connector, which is not aware of the AWS specific signing process, you can use the open-source aws-signing-request-interceptor, which is available from Maven central. You just need to add an interceptor to the Elasticsearch sink that is called just before the request is sent to the Amazon Elasticsearch Service endpoint. The interceptor can then sign the request using the permission of the role that has been configured for the Kinesis Data Analytics for Java application.

final List<HttpHost> httpHosts = Arrays.asList(HttpHost.create("https://...")));

ElasticsearchSink.Builder<T> esSinkBuilder = new ElasticsearchSink.Builder<>(
    new ElasticsearchSinkFunction<T>() {

final Supplier<LocalDateTime> clock = () -> LocalDateTime.now(ZoneOffset.UTC);
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
final AWSSigner awsSigner = new AWSSigner(credentialsProvider, "eu-west-1", "es", clock);

    restClientBuilder -> restClientBuilder.setHttpClientConfigCallback(
        callback -> callback.addInterceptorLast(new AWSSigningRequestInterceptor(awsSigner))


Note that the actual code in the GitHub repository is a bit more sophisticated because you need to obtain a serializable request interceptor. But the basic approach to sign requests remains the same.

Monitor and debug the Flink application

When running a Kinesis Data Analytics for Java application, you don’t get direct access to the cluster that runs Flink. This is because the underlying infrastructure is completely managed by the service. You merely interact with the service through an API. However, you can still obtain metrics and logging information through CloudWatch and CloudWatch Logs, respectively.

The Kinesis Data Analytics for Java application exposes a lot of operational metrics, ranging from metrics for the entire application down to metrics for individual processes of operators of the application. You can control which level of detail is adequate or required for your purposes. In fact, the metrics used in the previous section were all obtained through CloudWatch.

In addition to operational metrics, you can configure the Kinesis Data Analytics for Java application to write messages to CloudWatch Logs. This capability seamlessly integrates with common logging frameworks, such as Apache Log4j and the Simple Logging Facade for Java (SLF4J). So it is useful for debugging and identifying the cause of operational issues.

To enable logging for your Kinesis Data Analytics for Java application, just specify an existing CloudWatch log stream as a logging option when you start the application, as follows:

final Logger LOG = LoggerFactory.getLogger(...);

LOG.info("Starting to consume events from stream {}", flinkProperties.getProperty("InputStreamName"));

After the log messages are persisted into CloudWatch Logs, you can easily query and analyze them through CloudWatch Logs Insights


In this post, you not only built a reliable, scalable, and highly available streaming application based on Apache Flink and Kinesis Data Analytics for Java Applications. You also scaled the different components while ingesting and analyzing up to 25k events per second in near-real time. In large parts, this scenario was enabled by using managed services, so you didn’t need to spend time on provisioning and configuring the underlying infrastructure.

The sources of the application and the AWS CloudFormation template used in this post are available from GitHub for your reference. You can dive into all the details of the Flink application and the configuration of the underlying services. I’m curious to see what you will build when you can focus on analyzing data in a streaming fashion rather than spending time on managing and operating infrastructure.


About the Author

Steffen Hausmann is a specialist solutions architect with AWS.





Improve clinical trial outcomes by using AWS technologies

Post Syndicated from Mayank Thakkar original https://aws.amazon.com/blogs/big-data/improve-clinical-trial-outcomes-by-using-aws-technologies/

We are living in a golden age of innovation, where personalized medicine is making it possible to cure diseases that we never thought curable. Digital medicine is helping people with diseases get healthier, and we are constantly discovering how to use the body’s immune system to target and eradicate cancer cells. According to a report published by ClinicalTrials.gov, the number of registered studies hit 293,000 in 2018, representing a 250x growth since 2000.

However, an internal rate of return (IRR) analysis by Endpoints News, using data from EvaluatePharma, highlights some interesting trends. A flourishing trend in pharma innovation is supported by strong growth in registered studies. However, the IRR shows a rapidly declining trend, from around 17 percent in 2000 to below the cost of capital in 2017 and projected to go to 0 percent by 2020.

This blog post is the first installment in a series that focuses on the end-to-end workflow of collecting, storing, processing, visualizing, and acting on clinical trials data in a compliant and secure manner. The series also discusses the application of artificial intelligence and machine learning technologies to the world of clinical trials. In this post, we highlight common architectural patterns that AWS customers use to modernize their clinical trials. These incorporate mobile technologies for better evidence generation, cost reduction, increasing quality, improving access, and making medicine more personalized for patients.

Improving the outcomes of clinical trials and reducing costs

Biotech and pharma organizations are feeling the pressure to use resources as efficiently as possible. This pressure forces them to search for any opportunity to streamline processes, get faster, and stay more secure, all while decreasing costs. More and more life sciences companies are targeting biologics, CAR-T, and precision medicine therapeutics, with focus shifting towards smaller, geographically distributed patient segments. This shift has resulted in an increasing mandate to capture data from previously unavailable, nontraditional sources. These sources include mobile devices, IoT devices, and in-home and clinical devices. Life sciences companies merge data from these sources with data from traditional clinical trials to build robust evidence around the safety and efficacy of a drug.

Early last year, the Clinical Trials Transformation Initiative (CTTI) provided recommendations about using mobile technologies for capturing holistic, high quality, attributable, real-world data from patients and for submission to the U.S. Food and Drug Administration (FDA). By using mobile technologies, life sciences companies can reduce barriers to trial participation and lower costs associated with conducting clinical trials. Global regulatory groups such as the FDA, Health Canada, and Medicines and Healthcare products Regulatory Agency (MHRA), among others, are also in favor of using mobile technologies. Mobile technologies can make patient recruitment more efficient, reach endpoints faster, and reduce the cost and time required to conduct clinical trials.

Improvised data ingestion using mobile technologies can speed up outcomes, reduce costs, and improve the accuracy of clinical trials. This is especially true when mobile data ingestion is supplemented with artificial intelligence and machine learning (AI/ML) technologies.

Together, they can usher in a new age of smart clinical trials.

At the same time, traditional clinical trial processes and technology designed for mass-marketed blockbuster drugs can’t effectively meet emerging industry needs. This leaves life sciences and pharmaceutical companies in need of assistance for evolving their clinical trial operations. These circumstances result in making clinical trials one of the largest areas of investment for bringing a new drug to market.

Using mobile technologies with traditional technologies in clinical trials can improve the outcomes of the trials and simultaneously reduce costs. Some of the use cases that the integration of various technologies enables include these:

  • Identifying and tracking participants in clinical trials
    • Identifying participants for clinical trials recruitment
    • Educating and informing patients participating in clinical trials
    • Implementing standardized protocols and sharing associated information to trial participants
    • Tracking adverse events and safety profiles
  • Integrating genomic and phenotypic data for identifying novel biomarkers
  • Integrating mobile data into clinical trials for better clinical trial management
  • Creation of a patient-control arm based on historical data
  • Stratifying cohorts based on treatment, claims, and registry datasets
  • Building a collaborative, interoperable network for data sharing and knowledge creation
  • Building compliance-ready infrastructure for clinical trial management

The AWS Cloud provides HIPAA eligible services and solutions. As an AWS customer, you can use these to build solutions for global implementation of mobile devices and sensors in trials, secure capture of streaming Internet of Things (IoT) data, and advanced analytics through visualization tools or AI/ML capabilities. Some of the use cases these services and solutions enable are finding and recruiting patients using smart analytics, facilitating global data management, and remote or in-patient site monitoring. Others include predicting lack of adherence, detecting adverse events, and accelerating trial outcomes along with optimizing trial costs.

Clinical Trials 2.0 (CT2.0) at AWS is geared toward facilitating wider adoption of cloud-native services to enable data ingestion from disparate sources, cost-optimized and reliable storage, and holistic analytics. At the same time, CT2.0 provides the granular access control, end-to-end security, and global scalability needed to conduct clinical trials more efficiently.

Reference architecture

One of the typical architectures for managing a clinical trial using mobile technologies is shown following. This architecture focuses on capturing real-time data from mobile sources and providing a way to process it.

* – Additional considerations such as data security, access control, and compliance need to be incorporated into the architecture and are discussed in the remainder of this post.

Managing a trial by using this architecture consists of the following five major steps.

Step 1: Collect data

Mobile devices, personal wearables, instruments, and smart-devices are extensively being used (or being considered) by global pharmaceutical companies in patient care and clinical trials to provide data for activity tracking, vital signs monitoring, and so on, in real-time. Devices like infusion pumps, personal use dialysis machines, and so on require tracking and alerting of device consumables and calibration status. Remote settings management is also a major use case for these kinds of devices. The end-user mobile devices used in the clinical trial emit a lot of telemetry data that requires real-time data capture, data cleansing, transformation, and analysis.

Typically, these devices are connected to an edge node or a smart phone. Such a connection provides sufficient computing resources to stream data to AWS IoT Core. AWS IoT Core can then be configured to write data to Amazon Kinesis Data Firehose in near real time. Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3. S3 provides online, flexible, cost efficient, pay-as-you-go storage that can replicate your data on three Availability Zones within an AWS Region. The edge node or smart phone can use the AWS IoT SDKs to publish and subscribe device data to AWS IoT Core with MQTT. This process uses the AWS method of authentication (called ‘SigV4’), X.509 certificate–based authentication, and customer-created token-based authentication (through custom authorizers). This authenticated approach enables you to map your choice of policies to each certificate and remotely control device or application access. You can also use the Kinesis Data Firehose encryption feature to enable server-side data encryption.

You can also capture additional data such as Case Report Forms (CRF), Electronic Medical Records (EMR), and medical images using Picture Archiving and Communication Systems (PACS). In addition, you can capture laboratory data (Labs) and other Patient Reported Outcomes data (ePRO). AWS provides multiple tools and services to effectively and securely connect to these data sources, enabling you to ingest data in various volumes, variety, and velocities. For more information about creating a HealthCare Data Hub and ingesting Digital Imaging and Communications in Medicine (DICOM) data, see the AWS Big Data Blog post Create a Healthcare Data Hub with AWS and Mirth Connect.

Step 2: Store data

After data is ingested from the devices and wearables used in the clinical trial, Kinesis Data Firehose is used to store the data on Amazon S3. This stored data serves as a raw copy and can later be used for historical analysis and pattern prediction. Using Amazon S3’s lifecycle policies, you can periodically move your data to reduced cost storage such as Amazon S3 Glacier for further optimizing their storage costs. Using Amazon S3 Intelligent Tiering can automatically optimize costs when data access patterns change, without performance impact or operational overhead by moving data between two access tiers—frequent access and infrequent access. You can also choose to encrypt data at rest and in motion using various encryption options available on S3.

Amazon S3 offers an extremely durable, highly available, and infinitely scalable data storage infrastructure, simplifying most data processing, backup, and replication tasks.

Step 3: Data processingfast lane

After collecting and storing a raw copy of the data, Amazon S3 is configured to publish events to AWS Lambda and invoke a Lambda function by passing the event data as a parameter. The Lambda function is used to extract the key performance indicators (KPIs) such as adverse event notifications, medication adherence, and treatment schedule management from the incoming data. You can use Lambda to process these KPIs and store them in Amazon DynamoDB, along with encryption at rest, which powers a near-real-time clinical trial status dashboard. This alerts clinical trial coordinators in real time so that appropriate interventions can take place.

In addition to this, using a data warehouse full of medical records, you can train and implement a machine learning model. This model can predict which patients are about to switch medications or might exhibit adherence challenges in the future. Such prediction can enable clinical trial coordinators to narrow in on those patients with mitigation strategies.

Step 4: Data processing—batch

For historical analysis and pattern prediction, the staged data (stored in S3) is processed in batches. A Lambda function is used to trigger the extract, transform, and load (ETL) process every time new data is added to the raw data S3 bucket. This Lambda function triggers an ETL process using AWS Glue, a fully managed ETL service that makes it easy for you to prepare and load your data for analytics. This approach helps in mining current and historical data to derive actionable insights, which is stored on Amazon S3.

From there, data is loaded on to Amazon Redshift, a cost-effective, petabyte-scale data warehouse offering from AWS. You can also use Amazon Redshift Spectrum to extend data warehousing out to exabytes without loading any data to Amazon Redshift, as detailed in the Big Data blog post Amazon Redshift Spectrum Extends Data Warehousing Out to Exabytes—No Loading Required. This enables you to provide an all-encompassing picture of the entire clinical trial to your clinical trial coordinators, enabling you to react and respond faster.

In addition to this, you can train and implement a machine learning model to identify patients who might be at risk for adherence challenges. This enables clinical trial coordinators to reinforce patient education and support.

Step 5: Visualize and act on data

After the data is processed and ready to be consumed, you can use Amazon QuickSight, a cloud-native business intelligence service from AWS that offers native Amazon Redshift connectivity. Amazon QuickSight is serverless and so can be rolled out to your audiences in hours. You can also use a host of third-party reporting tools, which can use AWS-supplied JDBC or ODBC drivers or open-source PostgreSQL drivers to connect with Amazon Redshift. These tools include TIBCO Spotfire Analytics, Tableau Server, Qlik Sense Enterprise, Looker, and others. Real-time data processing (step 3 preceding) combines with historical-view batch processing (step 4). Together, they empower contract research organizations (CROs), study managers, trial coordinators, and other entities involved in the clinical trial journey to make effective and informed decisions at a speed and frequency that was previously unavailable. Using Amazon QuickSight’s unique Pay-per-Session pricing model, you can optimize costs for your bursty usage models by paying only when users access the dashboards.

Using Amazon Simple Notification Service (Amazon SNS), real-time feedback based on incoming data and telemetry is sent to patients by using text messages, mobile push, and emails. In addition, study managers and coordinators can send Amazon SNS notifications to patients. Amazon SNS provides a fully managed pub/sub messaging for micro services, distributed systems, and serverless applications. It’s designed for high-throughput, push-based, many-to-many messaging. Alerts and notifications can be based on current data or a combination of current and historical data.

To encrypt messages published to Amazon SNS, you can follow the steps listed in the post Encrypting messages published to Amazon SNS with AWS KMS, on the AWS Compute Blog.   

Data security, data privacy, data integrity, and compliance considerations

At AWS, customer trust is our top priority. We deliver services to millions of active customers, including enterprises, educational institutions, and government agencies in over 190 countries. Our customers include financial services providers, healthcare providers, and governmental agencies, who trust us with some of their most sensitive information.

To facilitate this, along with the services mentioned earlier, you should also use AWS Identity and Access Management (IAM) service. IAM enables you to maintain segregation of access, fine-grained access control, and securing end user mobile and web applications. You can also use AWS Security Token Service (AWS STS) to provide secure, self-expiring, time-boxed, temporary security credentials to third-party administrators and service providers, greatly strengthening your security posture. You can use AWS CloudTrail to log IAM and STS API calls. Additionally, AWS IoT Device Management makes it easy to securely onboard, organize, monitor, and remotely manage IoT devices at scale.

With AWS, you can add an additional layer of security to your data at rest in the cloud. AWS provides scalable and efficient encryption features for services like Amazon EBS, Amazon S3, Amazon Redshift, Amazon SNSAWS Glue, and many more. Flexible key management options, including AWS Key Management Service, enable you to choose whether to have AWS manage the encryption keys or to keep complete control over their keys. In addition, AWS provides APIs for you to integrate encryption and data protection with any of the services that you develop or deploy in an AWS environment.

As a customer, you maintain ownership of your data, and select which AWS services can process, store, and host the content. Generally speaking, AWS doesn’t access or use customers’ content for any purpose without their consent. AWS never uses customer data to derive information for marketing or advertising.

When evaluating the security of a cloud solution, it’s important that you understand and distinguish between the security of the cloud and security in the cloud. The AWS Shared Responsibility Model details this relationship.

To assist you with your compliance efforts, AWS continues to add more services to the various compliance regulations, attestations, certifications, and programs across the world. To decide which services are suitable for you, see the services in scope page.

You can also use various services like, but not limited to, AWS CloudTrail, AWS Config, Amazon GuardDuty, and AWS Key Management Service (AWS KMS) to enhance your compliance and auditing efforts. Find more details in the AWS Compliance Solutions Guide.

Final thoughts

With the ever-growing interconnectivity and technological advances in the field of medical devices, mobile devices and sensors can improve numerous aspects of clinical trials. They can help in recruitment, informed consent, patient counseling, and patient communication management. They can also improve protocol and medication adherence, clinical endpoints measurement, and the process of alerting participants on adverse events. Smart sensors, smart mobile devices, and robust interconnecting systems can be central in conducting clinical trials.

Every biopharma organization conducting or sponsoring a clinical trial activity faces the conundrum of advancing their approach to trials while maintaining overall trial performance and data consistency. The AWS Cloud enables a new dimension for how data is collected, stored, and used for clinical trials. It thus addresses that conundrum as we march towards a new reality of how drugs are brought to market. The AWS Cloud abstracts away technical challenges such as scaling, security, and establishing a cost-efficient IT infrastructure. In doing so, it allows biopharma organizations to focus on their core mission of improving patent lives through the development of effective, groundbreaking treatments.


About the Author

Mayank Thakkar – Global Solutions Architect, AWS HealthCare and Life Sciences





Deven Atnoor, Ph.D. – Industry Specialist, AWS HealthCare and Life Sciences





Best practices for successfully managing memory for Apache Spark applications on Amazon EMR

Post Syndicated from Karunanithi Shanmugam original https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

In the world of big data, a common use case is performing extract, transform (ET) and data analytics on huge amounts of data from a variety of data sources. Often, you then analyze the data to get insights. One of the most popular cloud-based solutions to process such vast amounts of data is Amazon EMR.

Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS. Amazon EMR enables organizations to spin up a cluster with multiple instances in a matter of few minutes. It also enables you to process various data engineering and business intelligence workloads through parallel processing. By doing this, to a great extent you can reduce the data processing times, effort, and costs involved in establishing and scaling a cluster.

Apache Spark is a cluster-computing software framework that is open-source, fast, and general-purpose. It is widely used in distributed processing of big data. Apache Spark relies heavily on cluster memory (RAM) as it performs parallel computing in memory across nodes to reduce the I/O and execution times of tasks.

Generally, you perform the following steps when running a Spark application on Amazon EMR:

  1. Upload the Spark application package to Amazon S3.
  2. Configure and launch the Amazon EMR cluster with configured Apache Spark.
  3. Install the application package from Amazon S3 onto the cluster and then run the application.
  4. Terminate the cluster after the application is completed.

It’s important to configure the Spark application appropriately based on data and processing requirements for it to be successful. With default settings, Spark might not use all the available resources of the cluster and might end up with physical or virtual memory issues, or both. There are thousands of questions raised in stackoverflow.com related to this specific topic.

This blog post is intended to assist you by detailing best practices to prevent memory-related issues with Apache Spark on Amazon EMR.

Common memory issues in Spark applications with default or improper configurations

Listed following are a few sample out-of-memory errors that can occur in a Spark application with default or improper configurations.

Out of Memory Error, Java Heap Space

WARN TaskSetManager: Loss was due to 
java.lang.OutOfMemoryError: Java heap space

Out of Memory Error, Exceeding Physical Memory

Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits.
12.4 GB of 12.3 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.
Error: ExecutorLostFailure Reason: Container killed by YARN for exceeding limits.
4.5GB of 3GB physical memory used limits.
Consider boosting spark.yarn.executor.memoryOverhead.

Out of Memory Error, Exceeding Virtual Memory

Container killed by YARN for exceeding memory limits.
1.1gb of 1.0gb virtual memory used. Killing container.

Out of Memory Error, Exceeding Executor Memory

Required executor memory (1024+384 MB) is above 
the max threshold (896 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb'
and/or 'yarn.nodemanager.resource.memory-mb

These issues occur for various reasons, some of which are listed following:

  1. When the number of Spark executor instances, the amount of executor memory, the number of cores, or parallelism is not set appropriately to handle large volumes of data.
  2. When the Spark executor’s physical memory exceeds the memory allocated by YARN. In this case, the total of Spark executor instance memory plus memory overhead is not enough to handle memory-intensive operations. Memory-intensive operations include caching, shuffling, and aggregating (using reduceByKey, groupBy, and so on). Or, in some cases, the total of Spark executor instance memory plus memory overhead can be more than what is defined in yarn.scheduler.maximum-allocation-mb.
  3. The memory required to perform system operations such as garbage collection is not available in the Spark executor instance.

In the following sections, I discuss how to properly configure to prevent out-of-memory issues, including but not limited to those preceding.

Configuring for a successful Spark application on Amazon EMR

The following steps can help you configure a successful Spark application on Amazon EMR.

1. Determine the type and number of instances based on application needs

Amazon EMR has three types of nodes:

  1. Master: An EMR cluster has one master, which acts as the resource manager and manages the cluster and tasks.
  2. Core: The core nodes are managed by the master node. Core nodes run YARN NodeManager daemons, Hadoop MapReduce tasks, and Spark executors to manage storage, execute tasks, and send a heartbeat to the master.
  3. Task: The optional task-only nodes perform tasks and don’t store any data, in contrast to core nodes.

Best practice 1: Choose the right type of instance for each of the node types in an Amazon EMR cluster. Doing this is one key to success in running any Spark application on Amazon EMR.

There are numerous instance types offered by AWS with varying ranges of vCPUs, storage, and memory, as described in the Amazon EMR documentation. Based on whether an application is compute-intensive or memory-intensive, you can choose the right instance type with the right compute and memory configuration.

For memory-intensive applications, prefer R type instances over the other instance types. For compute-intensive applications, prefer C type instances. For applications balanced between memory and compute, prefer M type general-purpose instances.

To understand the possible use cases for each instance type offered by AWS, see Amazon EC2 Instance Types on the EC2 service website.

After deciding the instance type, determine the number of instances for each of the node types. You do this based on the size of the input datasets, application execution times, and frequency requirements.

2. Determine the Spark configuration parameters

Before we dive into the details on Spark configuration, let’s get an overview of how the executor container memory is organized using the diagram following.

As the preceding diagram shows, the executor container has multiple memory compartments. Of these, only one (execution memory) is actually used for executing the tasks. These compartments should be properly configured for running the tasks efficiently and without failure.

Calculate and set the following Spark configuration parameters carefully for the Spark application to run successfully:

  • spark.executor.memory – Size of memory to use for each executor that runs the task.
  • spark.executor.cores – Number of virtual cores.
  • spark.driver.memory – Size of memory to use for the driver.
  • spark.driver.cores – Number of virtual cores to use for the driver.
  • spark.executor.instances ­– Number of executors. Set this parameter unless spark.dynamicAllocation.enabled is set to true.
  • spark.default.parallelism – Default number of partitions in resilient distributed datasets (RDDs) returned by transformations like join, reduceByKey, and parallelize when no partition number is set by the user.

Amazon EMR provides high-level information on how it sets the default values for Spark parameters in the release guide. These values are automatically set in the spark-defaults settings based on the core and task instance types in the cluster.

To use all the resources available in a cluster, set the maximizeResourceAllocation parameter to true. This EMR-specific option calculates the maximum compute and memory resources available for an executor on an instance in the core instance group. It then sets these parameters in the spark-defaults settings. Even with this setting, generally the default numbers are low and the application doesn’t use the full strength of the cluster. For example, the default for spark.default.parallelism is only 2 x the number of virtual cores available, though parallelism can be higher for a large cluster.

Spark on YARN can dynamically scale the number of executors used for a Spark application based on the workloads. Using Amazon EMR release version 4.4.0 and later, dynamic allocation is enabled by default (as described in the Spark documentation).

The problem with the spark.dynamicAllocation.enabled property is that it requires you to set subproperties. Some example subproperties are spark.dynamicAllocation.initialExecutors, minExecutors, and maxExecutors. Subproperties are required for most cases to use the right number of executors in a cluster for an application, especially when you need multiple applications to run simultaneously. Setting subproperties requires a lot of trial and error to get the numbers right. If they’re not right, the capacity might be reserved but never actually used. This leads to wastage of resources or memory errors for other applications.

Best practice 2: Set spark.dynamicAllocation.enabled to true only if the numbers are properly determined for spark.dynamicAllocation.initialExecutors/minExecutors/maxExecutors parameters. Otherwise, set spark.dynamicAllocation.enabled to false and control the driver memory, executor memory, and CPU parameters yourself. To do this, calculate and set these properties manually for each application (see the example following).

Let’s assume that we are going to process 200 terabytes of data spread across thousands of file stores in Amazon S3. Further, let’s assume that we do this through an Amazon EMR cluster with 1 r5.12xlarge master node and 19 r5.12xlarge core nodes. Each r5.12xlarge instance has 48 virtual cores (vCPUs) and 384 GB RAM. All these calculations are for the --deploy-mode cluster, which we recommend for production use.

The following list describes how to set some important Spark properties, using the preceding case as an example.


Assigning executors with a large number of virtual cores leads to a low number of executors and reduced parallelism. Assigning a low number of virtual cores leads to a high number of executors, causing a larger amount of I/O operations. Based on historical data, we suggest that you have five virtual cores for each executor to achieve optimal results in any sized cluster.

For the preceding cluster, the property spark.executor.cores should be assigned as follows: spark.executors.cores = 5 (vCPU)


After you decide on the number of virtual cores per executor, calculating this property is much simpler. First, get the number of executors per instance using total number of virtual cores and executor virtual cores. Subtract one virtual core from the total number of virtual cores to reserve it for the Hadoop daemons.

Number of executors per instance = (total number of virtual cores per instance - 1)/ spark.executors.cores

Number of executors per instance = (48 - 1)/ 5 = 47 / 5 = 9 (rounded down)

Then, get the total executor memory by using the total RAM per instance and number of executors per instance. Leave 1 GB for the Hadoop daemons.

Total executor memory = total RAM per instance / number of executors per instance
Total executor memory = 383 / 9 = 42 (rounded down)

This total executor memory includes the executor memory and overhead (spark.yarn.executor.memoryOverhead). Assign 10 percent from this total executor memory to the memory overhead and the remaining 90 percent to executor memory.

spark.executors.memory = total executor memory * 0.90
spark.executors.memory = 42 * 0.9 = 37 (rounded down)

spark.yarn.executor.memoryOverhead = total executor memory * 0.10
spark.yarn.executor.memoryOverhead = 42 * 0.1 = 5 (rounded up)


We recommend setting this to equal spark.executors.memory.

spark.driver.memory = spark.executors.memory


We recommend setting this to equal spark.executors.cores.

spark.driver.cores= spark.executors.cores.


Calculate this by multiplying the number of executors and total number of instances. Leave one executor for the driver.

spark.executor.instances = (number of executors per instance * number of core instances) minus 1 for the driver

spark.executor.instances = (9 * 19) - 1 = 170


Set this property using the following formula.

spark.default.parallelism = spark.executor.instances * spark.executors.cores * 2

spark.default.parallelism = 170 * 5 * 2 = 1,700

Warning: Although this calculation gives partitions of 1,700, we recommend that you estimate the size of each partition and adjust this number accordingly by using coalesce or repartition.

In case of dataframes, configure the parameter spark.sql.shuffle.partitions along with spark.default.parallelism.

Though the preceding parameters are critical for any Spark application, the following parameters also help in running the applications smoothly to avoid other timeout and memory-related errors. We advise that you set these in the spark-defaults configuration file.

  • spark.network.timeout – Timeout for all network transactions.
  • spark.executor.heartbeatInterval – Interval between each executor’s heartbeats to the driver. This value should be significantly less than spark.network.timeout.
  • spark.memory.fraction – Fraction of JVM heap space used for Spark execution and storage. The lower this is, the more frequently spills and cached data eviction occur.
  • spark.memory.storageFraction – Expressed as a fraction of the size of the region set aside by spark.memory.fraction. The higher this is, the less working memory might be available to execution. This means that tasks might spill to disk more often.
  • spark.yarn.scheduler.reporterThread.maxFailures – Maximum number executor failures allowed before YARN can fail the application.
  • spark.rdd.compress – When set to true, this property can save substantial space at the cost of some extra CPU time by compressing the RDDs.
  • spark.shuffle.compress – When set to true, this property compresses the map output to save space.
  • spark.shuffle.spill.compress – When set to true, this property compresses the data spilled during shuffles.
  • spark.sql.shuffle.partitions – Sets the number of partitions for joins and aggregations.
  • spark.serializer – Sets the serializer to serialize or deserialize data. As a serializer, I prefer Kyro (org.apache.spark.serializer.KryoSerializer), which is faster and more compact than the Java default serializer.

To understand more about each of the parameters mentioned preceding, see the Spark documentation.

We recommend you consider these additional programming techniques for efficient Spark processing:

  • coalesce – Reduces the number of partitions to allow for less data movement.
  • repartition – Reduces or increases the number of partitions and performs full shuffle of data as opposed to coalesce.
  • partitionBy – Distributes data horizontally across partitions.
  • bucketBy – Decomposes data into more manageable parts (buckets) based on hashed columns.
  • cache/persist – Pulls datasets into a clusterwide in-memory cache. Doing this is useful when data is accessed repeatedly, such as when querying a small lookup dataset or when running an iterative algorithm.

Best practice 3: Carefully calculate the preceding additional properties based on application requirements. Set these properties appropriately in spark-defaults, when submitting a Spark application (spark-submit), or within a SparkConf object.

3. Implement a proper garbage collector to clear memory effectively

Garbage collection can lead to out-of-memory errors in certain cases. These include cases when there are multiple large RDDs in the application. Other cases occur when there is an interference between the task execution memory and RDD cached memory.

You can use multiple garbage collectors to evict the old objects and place the new ones into the memory. However, the latest Garbage First Garbage Collector (G1GC) overcomes the latency and throughput limitations with the old garbage collectors.

Best practice 4: Always set up a garbage collector when handling large volume of data through Spark.

The parameter -XX:+UseG1GC specifies that the G1GC garbage collector should be used. (The default is -XX:+UseParallelGC.) To understand the frequency and execution time of the garbage collection, use the parameters -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps. To initiate garbage collection sooner, set InitiatingHeapOccupancyPercent to 35 (the default is 0.45). Doing this helps avoid potential garbage collection for the total memory, which can take a significant amount of time. An example follows.

"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",

4. Set the YARN configuration parameters

Even if all the Spark configuration properties are calculated and set correctly, virtual out-of-memory errors can still occur rarely as virtual memory is bumped up aggressively by the OS. To prevent these application failures, set the following flags in the YARN site settings.

Best practice 5: Always set the virtual and physical memory check flag to false.


5. Perform debugging and monitoring

To get details on where the spark configuration options are coming from, you can run spark-submit with the –verbose option. Also, you can use Ganglia and Spark UI to monitor the application progress, Cluster RAM usage, Network I/O, etc.

In the following example, we compare the outcomes between configured and non-configured Spark applications using Ganglia graphs.

When configured following the methods described, a Spark application can process 10 TB data successfully without any memory issues on an Amazon EMR cluster whose specs are as follows:

  • 1 r5.12xlarge master node
  • 19 r5.12xlarge core nodes
  • 8 TB total RAM
  • 960 total virtual CPUs
  • 170 executor instances
  • 5 virtual CPUs/executor
  • 37 GB memory/executor
  • Parallelism equals 1,700

Following, you can find Ganglia graphs for reference.

If you run the same Spark application with default configurations on the same cluster, it fails with an out-of-physical-memory error. This is because the default configurations (two executor instances, parallelism of 2, one vCPU/executor, 8-GB memory/executor) aren’t enough to process 10 TB data. Though the cluster had 7.8 TB memory, the default configurations limited the application to use only 16 GB memory, leading to the following out-of-memory error.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8, executor 7): ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 10.5 GB of 8 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Also, for large datasets, the default garbage collectors don’t clear the memory efficiently enough for the tasks to run in parallel, causing frequent failures. The following charts help in comparing the RAM usage and garbage collection with the default and G1GC garbage collectors.With G1GC, the RAM used is maintained below 5 TB (see the blue area in the graph).

With the default garbage collector (CMS), the RAM used goes above 5 TB. This can lead to the failure of the Spark job when running many tasks continuously.

Example: EMR instance template with configuration

There are different ways to set the Spark and YARN configuration parameters. One of ways is to pass these when creating the EMR cluster.

To do this, in the Amazon EMR console’s Edit software settings section, you can enter the appropriately updated configuration template (Enter configuration). Or the configuration can be passed from S3 (Load JSON from S3).

Following is a configuration template with sample values. At a minimum, calculate and set the following parameters for a successful Spark application.

                 "Classification": "yarn-site",
                 "Properties": {
                   "yarn.nodemanager.vmem-check-enabled": "false",
                   "yarn.nodemanager.pmem-check-enabled": "false"
                 "Classification": "spark",
                 "Properties": {
                   "maximizeResourceAllocation": "false"
                 "Classification": "spark-defaults",
                 "Properties": {
                   "spark.network.timeout": "800s",
                   "spark.executor.heartbeatInterval": "60s",
                   "spark.dynamicAllocation.enabled": "false",
                   "spark.driver.memory": "21000M",
                   "spark.executor.memory": "21000M",
                   "spark.executor.cores": "5",
                   "spark.executor.instances": "171",
                   "spark.yarn.executor.memoryOverhead": "21000M",
                   "spark.yarn.driver.memoryOverhead": "21000M",
                   "spark.memory.fraction": "0.80",
                   "spark.memory.storageFraction": "0.30",
                   "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
                   "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
                   "spark.yarn.scheduler.reporterThread.maxFailures": "5",
                   "spark.storage.level": "MEMORY_AND_DISK_SER",
                   "spark.rdd.compress": "true",
                   "spark.shuffle.compress": "true",
                   "spark.shuffle.spill.compress": "true",
                   "spark.default.parallelism": "3400"
                 "Classification": "mapred-site",
                 "Properties": {
                   "mapreduce.map.output.compress": "true"
                 "Classification": "hadoop-env",
                 "Configurations": [{
                   "Classification": "export",
                   "Configurations": [],
                   "Properties": {
                     "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
                 "Properties": {}
                 "Classification": "spark-env",
                 "Configurations": [{
                   "Classification": "export",
                   "Properties": {
                     "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
                 "Properties": {}


In this blog post, I detailed the possible out-of-memory errors, their causes, and a list of best practices to prevent these errors when submitting a Spark application on Amazon EMR.

My colleagues and I formed these best practices after thorough research and understanding of various Spark configuration properties and testing multiple Spark applications. These best practices apply to most of out-of-memory scenarios, though there might be some rare scenarios where they don’t apply. However, we believe that this blog post provides all the details needed so you can tweak parameters and successfully run a Spark application.


About the Author

Karunanithi Shanmugam is a data engineer with AWS Tech and Finance.





Federate Amazon Redshift access with Okta as an identity provider

Post Syndicated from Rajiv Gupta original https://aws.amazon.com/blogs/big-data/federate-amazon-redshift-access-with-okta-as-an-identity-provider/

Managing database users and access can be a daunting and error-prone task. In the past, database administrators had to determine which groups a user belongs to and which objects a user/group is authorized to use. These lists were maintained within the database and could easily get disjointed from the corporate directory.

With federation, you can manage users and groups within the enterprise identity provider (IdP) and pass them to Amazon Redshift at login. In a previous post, Federate Database User Authentication Easily with IAM and Amazon Redshift, I discussed the internals of the federation workflow using Active Directory Federation Service (AD FS) as our identity provider.

In this post, I focus on Okta as the identity provider. I provide step-by-step guidance showing how you can set up a trial Okta.com account, build users and groups within your organization’s directory, and enable single sign-on (SSO) into Amazon Redshift. You can do all of this while also maintaining group-level access controls within your data warehouse.

The steps in this post are structured into the following sections:

  • Identity provider (Okta) configuration – You set up Okta, which contains your users organized into logical groups.
  • AWS configuration – You set up a role that establishes a trust relationship between your identity provider and AWS and a role that Okta uses to access Amazon Redshift.
  • Identity provider (Okta) advanced configuration – You finalize the Okta configuration by inputting the roles that you just created. You also inform Okta about which groups are allowed to be passed to Amazon Redshift.
  • Amazon Redshift server/client setup – You set up groups within the Amazon Redshift database to match the Okta groups. You also authorize these groups to access certain schemas and tables. Finally, you set up your client tools to use your enterprise credentials and sign in to Amazon Redshift.

Identity provider (Okta) configuration

In this first step, you set up Okta, add users, and organize them into logical groups. You then add the Amazon Web Services Redshift Okta application.

Step 1: Create an Okta account

If you don’t already have access to an Okta account, you can start a 30-day free trial: https://www.okta.com/free-trial/.

Step 2: Set up your Okta directory

Sign in to Okta.com using the following URL, where <prefix> is specific to your account and was created at account setup:


Navigate to the Directory page to add people and groups into Okta that match your organization. Be sure to use lowercase group names because Amazon Redshift expects the group names to be lowercase.

In the following example, I added three users and two groups, where one of the users (Jorge) belongs to both the “sales” and “marketing” groups.

First, choose Admin in the upper-right corner.

To add users, choose Add Person. The following example shows the users that were created.

To add groups into Okta, choose Add Group. The following example shows three groups.

Step 3: Add the “Amazon Web Services Redshift” Okta application

Navigate to the Applications page. Choose Add Application, and search for the Amazon Web Services Redshift application. Proceed with the default settings.

Step 4: Download the Okta application metadata

Make sure that you have navigated to the Amazon Web Services Redshift application’s settings page, which appears as follows.

Choose Sign On, and then choose the Identity Provider metadata link to download the metadata file in xml format (for example, metadata.xml).

AWS configuration

Next, you set up a role that establishes a trust relationship between the identity provider and AWS. You also create a role that Okta uses to access Amazon Redshift.

Step 5: Create the SAML IAM identity provider

Switching to AWS Management Console, sign in using your AWS credentials. Then open the AWS Identity and Access Management (IAM) console.

On the IAM console, choose Identity providers, and then choose Create Provider, as shown following.

Provide a name for your IdP, and upload the metadata file that you downloaded in the previous step.

Step 6: Create the IAM SAML 2.0 federation role

On the IAM console, navigate to Roles and create a new SAML 2.0 federation role.  Reference the IdP that you created in the previous step, and choose Allow programmatic and AWS Management Console access.

Step 7: Add other permissions to query Amazon Redshift

Choose Next: Assign Permissions. Then choose Create policy.

Create the following custom policy, replacing the region, account, and cluster parameters. These permissions allow the role to use Amazon Redshift to query data, create users, and allow users to join groups.

    "Version": "2012-10-17",
    "Statement": [{
        "Effect": "Allow",
           "Action": [
           "Resource": "arn:aws:redshift:<region>:<account>:cluster:<cluster>"}]

There are a few important things to note:

  • The group membership lasts only for the duration of the user session.
  • There is no CreateGroup permission because groups need to be manually created and granted DB privileges.

The following image shows the summary page for the role.

Identity provider (Okta) advanced configuration

In this section, you finalize the Okta configuration by adding the roles that you just created. You also tell Okta which groups are allowed to be passed to Amazon Redshift.

Step 8: Configure the advanced sign-on settings

Switch back to Okta.com. Navigate to the settings page for the Amazon Web Services Redshift application. In the Sign-On section, scroll to Advanced Sign-On Settings.

Enter the previously created IdP and role ARNS, which are globally unique and ensure that Okta will be directed to your AWS account. Allowed DB Groups is a list of allowed groups that will be sent to Amazon Redshift in the DBGroup SAML assertion.

Don’t use the asterisk (*) wildcard. This will cause the Everyone group to be passed, and Amazon Redshift will complain because it expects the group names to be lowercase.  Note that the ${user.username} is sent in the DBUser SAML assertion.

Step 9: Authorize users

Authorize users to use the Amazon Web Services Redshift application by selecting their respective groups or individual user accounts. In this example, I authorized users by group.

Amazon Redshift server/client setup

Next, you set up groups in the Amazon Redshift database to match the Okta groups. You also authorize these groups to access certain schemas and tables. Finally, you set up your client tools to use your enterprise credentials and sign in to Amazon Redshift.

Step 10: Set up groups

Log in to your Amazon Redshift cluster with an admin account. Create groups that match the IdP group names, and grant the appropriate permissions to tables and schemas.

CREATE GROUP marketing;
GRANT ALL on TABLES to GROUP marketing;

Step 11: Configure the JDBC SQL client

Assuming that the Amazon Redshift JDBC driver is installed, set up a new connection to your cluster using your IdP credentials. In the following example, I am using SQLWorkbenchJ. For the URL, be sure to enter “iam” to instruct the driver to authenticate using IAM. For Username and Password, enter the values that you set in Okta.

Enter the extended properties as follows. For app_id and idp_host, refer to the URL for the application in your web browser:


Step 12: Configure the ODBC SQL client

Assuming that the Amazon Redshift ODBC driver is installed, set up a new connection to your cluster using your IdP credentials. In the following example, I modified the ~/Library/ODBC/odbc.ini file.  See the previous instructions for determining the <app_id> and <prefix> values.

[ODBC Data Sources]
Redshift DSN=Installed

[Redshift DSN]

Step 13: Test user access

You should now be able to sign on with the users created. In our example, [email protected] has access to the tables in the “sales” schema only. The user [email protected] has access to tables in the “marketing” schema only. And [email protected] has access to tables in both schemas. Using the [email protected] user, you get following results when trying to query data from each of the schemas:

select storeid From sales.stores


select * From marketing.campaign

An error occurred when executing the SQL command:
select * From marketing.campaign

[Amazon](500310) Invalid operation: permission denied for schema marketing;
1 statement failed.

Execution time: 0.16s


In this post, I provided a step-by-step guide for configuring and using Okta as your Identity Provider (IdP) to enable single sign-on to an Amazon Redshift cluster. I also showed how group membership within your IdP can be passed along, enabling you to manage user access to Amazon Redshift resources from within your IdP.

If you have questions or suggestions, please comment below.


About the Author

Rajiv Gupta is a data warehouse specialist solutions architect with Amazon Web Services.





Granting fine-grained access to the Amazon Redshift Management Console

Post Syndicated from Raj Jayaraman original https://aws.amazon.com/blogs/big-data/granting-fine-grained-access-to-the-amazon-redshift-management-console/

As a fully managed service, Amazon Redshift is designed to be easy to set up and use. In this blog post, we demonstrate how to grant access to users in an operations group to perform only specific actions in the Amazon Redshift Management Console. If you implement a custom IAM policy, you can set it up so these users can monitor and terminate running queries. At the same time, you can prevent these users from performing other more privileged operations such as modifying, restarting, or deleting an Amazon Redshift cluster.

An overview of Amazon Redshift access control

Since its release in February 2013, Amazon Redshift has quickly become a popular cloud-based data warehousing platform for thousands of customers worldwide.

Access to Amazon Redshift requires credentials that AWS can use to authenticate your requests. Those credentials must have permissions to access Amazon Redshift resources, such as an Amazon Redshift cluster or a snapshot. For more details on these credentials, see Authentication and Access Control for Amazon Redshift in the Amazon Redshift documentation.

Every AWS resource is owned by an AWS account, and permissions to create or access the resources are governed by AWS Identity and Access Management (IAM) policies. An AWS account administrator can attach permissions policies to IAM identities (users, groups, and roles). In particular, an AWS account administrator can attach an IAM permissions policy to a specific user. Such a policy grants permissions for that user to manage an Amazon Redshift resource, such as a snapshot or an event subscription.

When granting permissions, you can decide who gets the permissions and which Amazon Redshift resources they get permissions for. You can also decide on the specific actions that you want to allow on those resources. Policies attached to an IAM identity are referred to as identity-based IAM policies, and policies attached to a resource are referred to as resource-based policies. Amazon Redshift supports only identity-based IAM policies.

Use case: Setting limited access for a user

Consider the following use case. Suppose that an IAM user who is a member of a customer’s operations group needs to monitor and terminate queries running in an Amazon Redshift cluster. It’s best if they do so through the Amazon Redshift console. This user is not allowed to modify or delete any other Amazon Redshift resources.

To implement this use case, we need to implement a custom IAM policy that ensures this IAM user has read-only access to the Amazon Redshift console. Doing this means that the user can get descriptions of the available clusters and navigate to the Queries tab. Additionally, we want the IAM user to be able to cancel a running query through the Amazon Redshift console. To allow this, we use the redshift:CancelQuerySession IAM action. For descriptions of other allowed Amazon Redshift actions in an IAM policy and what each action means, see Actions Defined by Amazon Redshift in the Amazon Redshift documentation.

To create such a custom IAM policy, follow these instructions:

  1. Sign in to the AWS Management Console and open the IAM console at https://console.aws.amazon.com/iam/.
  2. In the navigation pane on the left, choose Policies.
  3. Choose Create policy.
  4. Choose the JSON tab and input the following policy:
    "Version": "2012-10-17",
    "Statement": [
            "Action": [
            "Effect": "Allow",
            "Resource": "*"

  1. On the Review policy page, type a value for Name and optionally for Description for the policy that you are creating. Review the policy Summary to see the permissions that are granted by your policy. Then choose Create policy to save your work.
  2. Attach this policy to an existing or a new IAM user.

With this permission policy, an IAM user can select an Amazon Redshift cluster, list all running queries in the Queries tab, and terminate a query if needed. All the permissions are read-only. Thus, the user can’t create a new Amazon Redshift cluster or modify or delete an existing cluster. However, the user can view available clusters, cluster snapshots, parameter groups, and cluster subnet groups, and view other properties of existing clusters.

Validating the use case

With the above IAM policy in place, after the IAM user logs into the Amazon Redshift Management Console, the user can select and view details about the Amazon Redshift cluster or clusters in the account. After navigating to the Queries tab, the user can see both the running and completed queries.

To cancel or terminate a long running query, the user can select the query from the list and choose Terminate Query. However, this user can’t modify or delete anything else in the Amazon Redshift console. As an example, if the user tries to modify an Amazon Redshift cluster (to change its endpoint), that user encounters the following error.


In this post, we have walked through a detailed customer use case of providing fine-grained access to the Amazon Redshift console. Using a set of carefully tailored IAM policies, a customer’s operations personnel can have read-only access to the Amazon Redshift console. These personnel can cancel or terminate running queries without the ability to modify, add, or delete any other Amazon Redshift resources.

We want to acknowledge our fellow AWS co-workers Ryan Mich, Sulay Shah and Hunter Grider for their many useful comments and suggestions.

If you have any questions or suggestions, leave your feedback in the comment section. If you need any further assistance to optimize your Amazon Redshift implementation, contact your AWS account team or a trusted AWS partner.


About the authors

Raj Jayaraman is a cloud support engineer with AWS Support at Amazon Web Services.





Po Hong, Ph.D. is a senior data architect within the Global Data & Analytics Specialty Practice at AWS Professional Services.





Recipe for building a widget: How we helped to “peak-shift” demand by helping passengers understand travel trends

Post Syndicated from Grab Tech original https://engineering.grab.com/peak-shift-demand-travel-trends

Credits: Photo by rawpixel on Unsplash


Stuck in traffic in a Grab ride? Pass the time by opening your Grab app and checking out the Feed – just scroll down! You’ll find widgets for games, polls, videos, news and even food recommendations!

Beyond serving your everyday needs, we want to provide our users with information that is interesting, useful and relevant. That’s why we’re always coming up with new widgets.

Building each widget takes close collaboration across multiple different teams – from Product Management to Design, Engineering, Behavioral Science, and Data Science and Analytics. Sounds like a lot of people, doesn’t it? But you’ll be surprised to hear that this behind-the-scenes collaboration works rapidly, usually in the span of one month! Which means we’re often moving from ideation phase to product release in just a few weeks.

Travel Trends Widget

This fast-and-furious process is anchored on one word – “customer-centric”. And that’s how it all began with our  “Travel Trends Widget” – a widget that provides passengers with an overview of historical supply and demand trends for their current location and nearby time periods.

Because we had so much fun developing this widget, we wanted to write a blog post to share with you what we did and how we did it!

Inspiration: Where it all started

Transport demand can be rather lumpy. Owing to organic patterns (e.g. office hours), a lot of passengers tend to request for cars around the same time. In periods like this, the increase in demand could outpace the arrival of driver supply, increasing the waiting time for passengers.

Our goal at Grab is to make sure people get a ride when they want it and at the price they want, so we got to thinking about how we can ease this friction by leveraging our treasure trove – Big Data! – to help our passengers better plan their trips.

As we were looking at the data, we noticed that there is a seasonality to demand and supply: at certain times and days, imbalances appear, peak and disappear, and the process repeats itself. Studies say that humans in general, unless shown a compelling reason or benefit for change, are habitual beings subject to inertia. So we set out to achieve exactly that: To create a widget to surface information to our passengers that may help them alter their decisions on when they choose to book a ride, thereby redistributing some of the present peak demands to periods just before and after peak – also known as “peak shifting the demand”!

While this widget is the first-of-its-kind in the ride-hailing industry, “peak-shifting” was actually coined and introduced long ago!

London Transport Museum Trends

As you can see from this post from the London Transport Museum (Source: Transport for London), London tube tried peak-shifting long before anyone else: Original Ad from 1928 displayed on the left, and Ad from 2015 displayed on the right, comparing the trends to 1928.

Trends from a hotel in Beijing

You may also have seen something similar at the last hotel you stayed at. Notice here a poster in an elevator at a Beijing hotel, announcing the best times to eat breakfast in comfort and avoid the crowd. (Photo credits to Prashant, our Product Manager, who saw this on holiday.)

To apply “peak-shifting” and help our users better plan their trips, we decided to dig in and leverage our data. It was way more complex than we had initially thought, as market conditions could be different on different days. This meant that  generic statements like “5PM-8PM are peak hours and prices will be hight” would not hold true. Contrary to general perception, we observed that even during peak hours, there are buckets of time when there is no surge or low surge.

For instance, plot 1 and plot 2 below shows how a typical Monday and Tuesday surge looks like in a given month respectively. One of the key insights is that the surge trends during peak hour is different on Monday from Tuesday. It reinforces our initial hypothesis that every day is unique.

So we used machine learning techniques to build a forecasting widget which can help our users and give them the power to plan their trips beforehand. This widget is able to provide the pricing trends for the next 2 hours. So with a bit of flexibility, riders can ride the tide!

Grab trends

So how exactly does this widget work?!

Historical trends for Monday

It pulls together historically observed imbalances between supply and demand, for the consumer’s current location and nearby time periods. Aggregated data is displayed to consumers in easily interpreted visualisations, so that they can plan to leave at times when there are more supply, and with potentially more savings for fares.

How did we build the widget? Loop, agile working process, POC & workstream

Widget-building is an agile, collaborative, and simultaneous process. First, we started the process with analysis from Product Analytics team, pulling out data on traffic trends, surge patterns, and behavioral insights of both passengers and drivers in Singapore.

When we noticed the existence of seasonality for each day of the week, we came up with more precise analytical and business questions to dig deeper into the data. Upon verification of hypotheses, we decided that we will build a widget.

Then joined the Behavioural Science, UX (User Experience) Design and the Product Management teams, who started giving shape to the problem we are solving. Our Behavioural Scientists shared their expertise on how information, suggestions and choices should be presented to enable easy assimilation and beneficial action. Daily whiteboarding breakouts, endless back-and forth conversations, and a healthy amount of challenge-and-accept culture ensured that we distilled the idea down to its core. We then presented the relevant information with just the right level of detail, and with the right amount of messaging, to allow users to take the intended action i.e. shift his/her demand outside of peak periods if possible.

Our amazing regional Copywriting team then swung in to put our intent into words in 7 different languages for our users across South-East Asia. Simultaneously, our UX designers and Full-stack Engineers started exploring the best visual components to communicate data on time trends to users. More on this later, but suffice to say that plenty of ideas were explored and discarded in a collaborative process, which aimed to create something that’s intuitive and engaging while being robust and scalable to work across all types of devices.

While these designs made their way up to engineering, the Data Science team worked on finding the most rigorous method to deduce the historical trend of surge across all our cities and areas, and time periods within them. There were discussions on how to best store and update this data reliably so that the widget itself can access it with great performance.

Soon after, we went into the development process, and voila! We had the first iteration of the widget ready on our staging (internal testing) servers in just 2 weeks! This prototype was opened up to the core team for influx of feedback.

And just two weeks later, the widget made its way to our Singapore and Jakarta Feeds, accessible to the world at large! Feedback from our users started pouring in almost immediately (thanks to the rich feedback functionality that comes with each widget), ranging from great to sometimes not-so-great, and we listened to all of it with a keen ear! And thus began a new cycle of iterations and continuous improvement, more of which we will share in a subsequent post.

In the trenches with the creators: How multiple teams got together to make this come true

Various disciplines within our cross functional team came together to whip out this widget by quipping their expertise to the end product.

Using Behavioural Science to simplify choices and design good outcomes

Behavioural Science helped to explore many facets of consumer behaviour in order to plan and design the widget: understanding how consumers think and conceptualizing a widget that can be easily understood and used by the consumers.

While fares are governed entirely by market conditions, it’s important for us to explain the economics to customers. As a customer-centric company, we aim to make the consumers feel like they own their decisions, which they can take based on full information. And this is the role of Behavioral Scientists at Grab!

In guiding the customers through the information, Behavioural Science team had the following three objectives in mind while building this Travel Trends widget:

  1. Offer transparency on the fares: By exposing our historic surge levels for a 4 hour period, we wanted to ensure that the passenger is aware of the surge levels and does not treat the fare as a nasty shock.
  2. Give information that helps them plan: By showing them surge levels for the future 2 hours, we wanted to help customers who have the flexibility, plan for a better time, hence, giving them the power to decide based on transparent information.
  3. Provide helpful tips: Every bar gives users tips on the conditions at that time and the immediate future. For instance, a low surge bar, followed by a high surge bar gives the tip “Psst… Leave now, It might get busy later!”, helping people understand the graph better and nudging them to take an action. If you are interested in saving fares, may we suggest tapping around all the bars to reveal the secret pro-tips?

Designing interfaces that lead to consumer success by abstracting complexity

Design team is the one behind the colors and shapes that make up the widget that you see and interact with! The team took inspiration from Google’s Popular Times.

Source/Credits: Google Live Popular Times
Source/Credits: Google Live Popular Times


Right from the offset, our content and product teams were keen to surface additional information and actions with each bar to keep the widget interactive and useful. One of the early challenges was to arrive at the right gesture that invites the user to interact and intuitively navigate the bars on the widget but also does not conflict with other gestures (eg scrolling and scrubbing) that the user was pre-trained to perform on the feed. We found out that tapping was simultaneously an unused and yet intuitive gesturethat we could use for interaction with the bars.

We then went into rounds of iteration on the visual design of the widget. In this process, multiple stakeholders were involved ranging from Product to Content to Engineering. We had to overcome a number of constraints i.e. the limited canvas of a widget and the context of a user when she is exploring the feed. By re-using existing libraries and components, we managed to keep the development light and ship something fast.

GrabCar trends near you

Dozens of revisions and four iterations later, we landed with a design that we felt equipped the feature for its user-facing goal, and did so in a manner which was aesthetically appealing!

And finally we managed to deliver on the feature’s goal, by surfacing just the right detail of information in a manner that is intuitive yet effective to peak-shift demand.  

Bringing all of this to fruition through high performance engineering

Our Development Engineering team was in charge of developing the widget and making it available to our users in just a few weeks’ time – materialising the work of the other teams.

One of their challenges was to find the best way to process the vast amount of data (millions of database entries) so it can be visualized simply as bar charts. Grab’s engineers had to achieve this while making sure performance is as resilient as possible.

There were two options in doing this:

a) Fetch the data directly from the DB for each API call; or

b) Store the data in an in-memory data structure on a timely basis, so when a user calls the API will no longer have to hit the DB.

After considering that this feature will likely expect a lot of traffic thus high QPS, we decided that the former option would be too costly. Ultimately, we chose the latter option since it is more performant and more scalable.

At the frontend, the challenge was to cater to the intricate request from our designers. We use chart libraries to increase our development speed, and not all of the requirements were readily supported by these libraries.

For instance, let’s say this library makes visualising charts easy, but not so much for customising them. If designers wanted to have an average line in a dotted form, the library did not support this so easily. Also, the moving arrow pointers as you move between bar chart, changing colors of the bars changes when clicked – all required countless CSS tweaks.

CSS tweak on trends widget
CSS tweak on trends widget

Closing the product loop with user feedback and data driven insights

One of the most crucial parts of launching any product is to ensure that customers are engaging with the widget and finding it useful.

To understand what customers think about the widget, whether they find it useful and whether it is helping them to plan better,  we delved into the huge mine of clickstream data.

User feedback on the trends widget

We found that 1 in 3 users who make a booking everyday interact with the widget. And of these people, more than 70% users have given positive rating for the widget. This validates our initial hypothesis that if given an option, our customers will love the freedom to plan their trips and inculcate more transparent ecosystem.

These users also indicate the things they like most about the widget. 61% of users gave positive rating for usefulness, 20% were impressed by the design (Kudos to our fantastic designer Ajmal!!) and 13% for usability.

Tweet about the widget

Beyond internal data, our widget made some rounds on social media channels. For Example, here is screenshot of what our users have to say on Twitter.

We closely track these metrics on user engagement and feedback to ensure that we keep improving and coming up with new iterations which helps us to serve our customers in a better way.


We hope you enjoyed reading about how we went from ideation, through iterations to a finished widget in the hands of the user, all in 1 month! Many hands helped along the way. If you are interested in joining this hyper-proactive problem-solving team, please check out Grab’s career site!

And if you have feedback for us, we are here to listen! While we cannot be happier to see some positive reaction from the public, we are also thrilled to hear your suggestions and advice. Please leave us a memo using the Widget’s comment function!


We just released an upgrade to this widget which allows users to set reminders and be notified about availability of good fares in a time period of their choosing. We will keep a watch and come knocking! Go ahead, find the widget on your Grab feed, set a reminder and save on fares on your next ride!

Alerting, monitoring, and reporting for PCI-DSS awareness with Amazon Elasticsearch Service and AWS Lambda

Post Syndicated from Michael Coyne original https://aws.amazon.com/blogs/security/alerting-monitoring-and-reporting-for-pci-dss-awareness-with-amazon-elasticsearch-service-and-aws-lambda/

Logging account activity within your AWS infrastructure is paramount to your security posture and could even be required by compliance standards such as PCI-DSS (Payment Card Industry Security Standard). Organizations often analyze these logs to adapt to changes and respond quickly to security events. For example, if users are reporting that their resources are unable to communicate with the public internet, it would be beneficial to know if a network access list had been changed just prior to the incident. Many of our customers ship AWS CloudTrail event logs to an Amazon Elasticsearch Service cluster for this type of analysis. However, security best practices and compliance standards could require additional considerations. Common concerns include how to analyze log data without the data leaving the security constraints of your private VPC.

In this post, I’ll show you not only how to store your logs, but how to put them to work to help you meet your compliance goals. This implementation deploys an Amazon Elasticsearch Service domain with Amazon Virtual Private Cloud (Amazon VPC) support by utilizing VPC endpoints. A VPC endpoint enables you to privately connect your VPC to Amazon Elasticsearch without requiring an internet gateway, NAT device, VPN connection, or AWS Direct Connect connection. An AWS Lambda function is used to ship AWS CloudTrail event logs to the Elasticsearch cluster. A separate AWS Lambda function performs scheduled queries on log sets to look for patterns of concern. Amazon Simple Notification Service (SNS) generates automated reports based on a sample set of PCI guidelines discussed further in this post and notifies stakeholders when specific events occur. Kibana serves as the command center, providing visualizations of CloudTrail events that need to be logged based on the provided sample set of PCI-DSS compliance guidelines. The automated report and dashboard that are constructed around the sample PCI-DSS guidelines assist in event awareness regarding your security posture and should not be viewed as a de facto means of achieving certification. This solution serves as an additional tool to provide visibility in to the actions and events within your environment. Deployment is made simple with a provided AWS CloudFormation template.

Figure 1: Architectural diagram

Figure 1: Architectural diagram

The figure above depicts the architecture discussed in this post. An Elasticsearch cluster with VPC support is deployed within an AWS Region and Availability Zone. This creates a VPC endpoint in a private subnet within a VPC. Kibana is an Elasticsearch plugin that resides within the Elasticsearch cluster, it is accessed through a provided endpoint in the output section of the CloudFormation template. CloudTrail is enabled in the VPC and ships CloudTrail events to both an S3 bucket and CloudWatch Log Group. The CloudWatch Log Group triggers a custom Lambda function that ships the CloudTrail Event logs to the Elasticsearch domain through the VPC endpoint. An additional Lambda function is created that performs a periodic set of Elasticsearch queries and produces a report that is sent to an SNS Topic. A Windows-based EC2 instance is deployed in a public subnet so users will have the ability to view and interact with a Kibana dashboard. Access to the EC2 instance can be restricted to an allowed CIDR range through a parameter set in the CloudFormation deployment. Access to the Elasticsearch cluster and Kibana is restricted to a Security Group that is created and is associated with the EC2 instance and custom Lambda functions.

Sample PCI-DSS Guidelines

This solution provides a sample set of (10) PCI-DSS guidelines for events that need to be logged.

  • All Commands, API action taken by AWS root user
  • All failed logins at the AWS platform level
  • Action related to RDS (configuration changes)
  • Action related to enabling/disabling/changing of CloudTrail, CloudWatch logs
  • All access to S3 bucket that stores the AWS logs
  • Action related to VPCs (creation, deletion and changes)
  • Action related to changes to SGs/NACLs (creation, deletion and changes)
  • Action related to IAM users, roles, and groups (creation, deletion and changes)
  • Action related to route tables (creation, deletion and changes)
  • Action related to subnets (creation, deletion and changes)

Solution overview

In this walkthrough, you’ll create an Elasticsearch cluster within an Amazon VPC environment. You’ll ship AWS CloudTrail logs to both an Amazon S3 Bucket (to maintain an immutable copy of the logs) and to a custom AWS Lambda function that will stream the logs to the Elasticsearch cluster. You’ll also create an additional Lambda function that will run once a day and build a report of the number of CloudTrail events that occurred based on the example set of 10 PCI-DSS guidelines and then notify stakeholders via SNS. Here’s what you’ll need for this solution:

To make it easier to get started, I’ve included an AWS CloudFormation template that will automatically deploy the solution. The CloudFormation template along with additional files can be downloaded from this link. You’ll need the following resources to set it up:

  • An S3 bucket to upload and store the sample AWS Lambda code and sample Kibana dashboards. This bucket name will be requested during the CloudFormation template deployment.
  • An Amazon Virtual Private Cloud (Amazon VPC).

If you’re unfamiliar with how CloudFormation templates work, you can find more info in the CloudFormation Getting Started guide.

AWS CloudFormation deployment

The following parameters are available in this template.

Elasticsearch Domain NameName of the Amazon Elasticsearch Service domain.
Elasticsearch Version6.2Version of Elasticsearch to deploy.
Elasticsearch Instance Count3The number of data nodes to deploy in to the Elasticsearch cluster.
Elasticsearch Instance ClassThe instance class to deploy for the Elasticsearch data nodes.
Elasticsearch Instance Volume Size10The size of the volume for each Elasticsearch data node in GB.
VPC to launch intoThe VPC to launch the Amazon Elasticsearch Service cluster into.
Availability Zone to launch intoThe Availability Zone to launch the Amazon Elasticsearch Service cluster into.
Private Subnet IDThe subnet to launch the Amazon Elasticsearch Service cluster into.
Elasticsearch Security GroupA new Security Group is created that will be associated with the Amazon Elasticsearch Service cluster.
Security Group DescriptionA description for the above created Security Group.
Windows EC2 Instance Classm5.largeWindows instance for interaction with Kibana.
EC2 Key PairEC2 Key Pair to associate with the Windows EC2 instance.
Public SubnetPublic subnet to associate with the Windows EC2 instance for access.
Remote Access Allowed CIDR0.0.0.0/0The CIDR range to allow remote access (port 3389) to the EC2 instance.
S3 Bucket Name—Lambda FunctionsS3 Bucket that contains custom AWS Lambda functions.
Private SubnetPrivate subnet to associate with AWS Lambda functions that are deployed within a VPC.
CloudWatch Log Group NameThis will create a CloudWatch Log Group for the AWS CloudTrail event logs.
S3 Bucket Name—CloudTrail loggingThis will create a new Amazon S3 Bucket for logging CloudTrail events. Name must be a globally unique value.
Date range to perform queriesnow-1d(examples: now-1d, now-7d, now-90d)
Lambda Subnet CIDRCreate a Subnet CIDR to deploy AWS Lambda Elasticsearch query function in to
Availability Zone—LambdaThe availability zone to associate with the preceding AWS Lambda Subnet
Email Address[email protected]Email address for reporting to notify stakeholders via SNS. You must accept the subscription by selecting the link sent to this address before alerts will arrive.

It takes 30-45 minutes for this stack to be created. When it’s complete, the CloudFormation console will display the following resource values in the Outputs tab. These values can be referenced at any time and will be needed in the following sections.

oElasticsearchDomainEndpointElasticsearch Domain Endpoint Hostname
oKibanaEndpointKibana Endpoint Hostname
oEC2InstanceWindows EC2 Instance Name used for Kibana access
oSNSSubscriberSNS Subscriber Email Address
oElasticsearchDomainArnArn of the Elasticsearch Domain
oEC2InstancePublicIpPublic IP address of the Windows EC2 instance

Managing and testing the solution

Now that you’ve set up the environment, it’s time to configure the Kibana dashboard.

Kibana configuration

From the AWS CloudFormation output, gather information related to the Windows-based EC2 instance. Once you have retrieved that information, move on to the next steps.

Initial configuration and index pattern

  1. Log into the Windows EC2 instance via Remote Desktop Protocol (RDP) from a resource that is within the allowed CIDR range for remote access to the instance.
  2. Open a browser window and navigate to the Kibana endpoint hostname URL from the output of the AWS CloudFormation stack. Access to the Elasticsearch cluster and Kibana is restricted to the security group that is associated with the EC2 instance and custom Lambda functions during deployment.
  3. In the Kibana dashboard, select Management from the left panel and choose the link for Index Patterns.
  4. Add one index pattern containing the following: cwl-*
    Figure 2: Define the index pattern

    Figure 2: Define the index pattern

  5. Select Next Step.
  6. Select the Time Filter Field named @timestamp.
    Figure 3: Select "@timestamp"

    Figure 3: Select “@timestamp”

  7. Select Create index pattern.

At this point we’ve launched our environment and have accessed the Kibana console. Within the Kibana console, we’ve configured the index pattern for the CloudWatch logs that will contain the CloudTrail events. Next, we’ll configure visualizations and a dashboard.

Importing sample PCI DSS queries and Kibana dashboard

  1. Copy the export.json from the location you extracted the downloaded zip file to the EC2 Kibana bastion.
  2. Select Management on the left panel and choose the link for Saved Objects.
  3. Select Import in upper right corner and navigate to export.json.
  4. Select Yes, overwrite all saved objects, then select Index Pattern cwl-* and confirm all changes.
  5. Once the import completes, select PCI DSS Dashboard to see the sample dashboard and queries.

Note: You might encounter an error during the import that looks like this:

Figure 4: Error message

Figure 4: Error message

This simply means that your streamed logs do not have login-type events in the time period since your deployment. To correct this, you can add a field with a null event.

  1. From the left panel, select Dev Tools and copy the following JSON into the left panel of the console:
            POST /cwl-/default/
                "userIdentity": {
                    "userName": "test"

  2. Select the green Play triangle to execute the POST of a document with the missing field.
    Figure 5: Select the "Play" button

    Figure 5: Select the “Play” button

  3. Now reimport the dashboard using the steps in Importing Sample PCI DSS Queries and Kibana Dashboard. You should be able to complete the import with no errors.

At this point, you should have CloudTrail events that have been streamed to the Elasticsearch cluster, with a configured Kibana dashboard that looks similar to the following graphic:

Figure 6: A configured Kibana dashboard

Figure 6: A configured Kibana dashboard

Automated Reports

A custom AWS Lambda function was created during the deployment of the Amazon CloudFormation stack. This function uses the sample PCI-DSS guidelines from the Kibana dashboard to build a daily report. The Lambda function is triggered every 24 hours and performs a series of Elasticsearch time-based queries of now-1day (the last 24 hours) on the sample guidelines. The results are compiled into a message that is forwarded to Amazon Simple Notification Service (SNS), which sends a report to stakeholders based on the email address you provided in the CloudFormation deployment.

The Lambda function will be named <CloudFormation Stack Name>-ES-Query-LambdaFunction. The Lambda Function enables environment variables such as your query time window to be adjusted or additional functionality like additional Elasticsearch queries to be added to the code. The below sample report allows you to monitor any events against the sample PCI-DSS guidelines. These reports can then be further analyzed in the Kibana dashboard.

    Logging Compliance Report - Wednesday, 11. July 2018 01:06PM
    Violations for time period: 'now-1d'
    All Failed login attempts
    - No Alerts Found
    All Commands, API action taken by AWS root user
    - No Alerts Found
    Action related to RDS (configuration changes)
    - No Alerts Found
    Action related to enabling/disabling/changing of CloudTrail CloudWatch logs
    - 3 API calls indicating alteration of log sources detected
    All access to S3 bucket that stores the AWS logs
    - No Alerts Found
    Action related to VPCs (creation, deletion and changes)
    - No Alerts Found
    Action related to changes to SGs/NACLs (creation, deletion and changes)
    - No Alerts Found
    Action related to changes to IAM roles, users, and groups (creation, deletion and changes)
    - 2 API calls indicating creation, alteration or deletion of IAM roles, users, and groups
    Action related to changes to Route Tables (creation, deletion and changes)
    - No Alerts Found
    Action related to changes to Subnets (creation, deletion and changes)
    - No Alerts Found         


At this point, you have now created a private Elasticsearch cluster with Kibana dashboards that monitors AWS CloudTrail events on a sample set of PCI-DSS guidelines and uses Amazon SNS to send a daily report providing awareness in to your environment—all isolated securely within a VPC. In addition to CloudTrail events streaming to the Elasticsearch cluster, events are also shipped to an Amazon S3 bucket to maintain an immutable source of your log files. The provided Lambda functions can be further modified to add additional or more complex search queries and to create more customized reports for your organization. With minimal effort, you could begin sending additional log data from your instances or containers to gain even more insight as to the security state of your environment. The more data you retain, the more visibility you have into your resources and the closer you are to achieving Compliance-on-Demand.

Want more AWS Security how-to content, news, and feature announcements? Follow us on Twitter.


Michael Coyne

Michael is a consultant for AWS Professional Services. He enjoys the fast-paced environment of ever-changing technology and assisting customers in solving complex issues. Away from AWS, Michael can typically be found with a guitar and spending time with his wife and two young kiddos. He holds a BS in Computer Science from WGU.

New – Amazon Kinesis Data Analytics for Java

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-amazon-kinesis-data-analytics-for-java/

Customers are using Amazon Kinesis to collect, process, and analyze real-time streaming data. In this way, they can react quickly to new information from their business, their infrastructure, or their customers. For example, Epic Games ingests more than 1.5 million game events per second for its popular online game, Fornite.

With Amazon Kinesis Data Analytics you can process data in real-time using standard SQL. While SQL provides an easy way to quickly query large volumes of streaming data without learning new frameworks or languages, many customers also want to build more sophisticated data processing applications using general-purpose programming languages.

Using Java with Amazon Kinesis Data Analytics

Today, we are introducing support for Java in Amazon Kinesis Data Analytics. Now, developers can use their own Java code to create powerful real-time applications that process streaming data like continuously transforming and loading data into their data lakes, generating metrics to feed real-time gaming leaderboards, applying machine learning models to data streams from connected devices, and more.

To use this new functionality, developers build applications using open source libraries which include built-in operators for common data processing functions that allow applications to organize, transform, aggregate, and analyze data at any scale. These libraries are both open source and you can run them anywhere:

  • Apache Flink, an open source framework and engine for processing data streams.
  • AWS SDK for Java, providing Java APIs for many AWS services.

Developers can use these Java libraries within their Integrated Development Environment (IDE) of choice. Using these libraries, the following AWS services can be integrated with as little as one line of code:

  • Streaming Data Sources: Amazon Kinesis Data Streams
  • Streaming Destinations: Amazon S3, Amazon DynamoDB, Amazon Kinesis Data Streams, Amazon Kinesis Data Firehose

In addition to the pre-built AWS integrations, the Java libraries include more connectors to tools like Cassandra, ElasticSearch, RabbitMQ, Redis, and more, and the ability to build custom integrations.

Building a Kinesis Data Streams Java Application

I prepared a simple Java application that implements the “mandatory” word count example for data processing. I send some paragraphs of text in input and I get, every five seconds, the number of times each word is being used as output.

First, I create two Kinesis Data Streams:

  • TextInputStream, where I am going to send my input records
  • WordCountOutputStream, where I am going to read the output of the Java application


Here is the code of the word-count Java application. To read and write from Kinesis Data Streams, I am using the Kinesis Connector from the Apache Flink project.

public class StreamingJob {

    private static final String region = "us-east-1";
    private static final String inputStreamName = "TextInputStream";
    private static final String outputStreamName = "WordCountOutputStream";

    private static DataStream<String> createSourceFromStaticConfig(
            StreamExecutionEnvironment env) {
        Properties inputProperties = new Properties();
        inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

        return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
            new SimpleStringSchema(), inputProperties));

    private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
        Properties outputProperties = new Properties();
        outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

        FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new
            SimpleStringSchema(), outputProperties);
        return sink;

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env =

        DataStream<String> input = createSourceFromStaticConfig(env);

        input.flatMap(new Tokenizer())
             .map(new MapFunction<Tuple2<String, Integer>, String>() {
                 public String map(Tuple2<String, Integer> value) throws Exception {
                     return value.f0 + "," + value.f1.toString();

        env.execute("Word Count");

    public static final class Tokenizer
        implements FlatMapFunction<String, Tuple2<String, Integer>> {

		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
			String[] tokens = value.toLowerCase().split("\\W+");
			for (String token : tokens) {
				if (token.length() > 0) {
					out.collect(new Tuple2<>(token, 1));

The most important part of the application is the manipulation of the input object, where I apply a few DataStream Transformations:

  1. I start with a DataFrame containing the String from the input stream.
  2. I use a Tokenizer in a FlatMap to split the sentence into “words”, each word followed by the number “1”.
  3. I apply the KeyBy operator to logically partition the stream in respect to the “word”.
  4. I use a 5 seconds tumbling window.
  5. I aggregate within the window, summing up for each word the number “1” to count them.
  6. I use a simple Map for each record to join the word and the number into a comma-separated values (CSV) String that I send to the output stream.

One of the most powerful operators shown here is the KeyBy operator. It enables you to re-organize a particular stream by a specified key in real-time. This type of re-keying enables further downstream operations like aggregations, counts, and much more. This enables you to set up streaming map-reduce on different keys within the same application.

I build the Java application using Maven and load the output JAR to an Amazon Simple Storage Service (S3) bucket in the region where I want to deploy the application. In the Kinesis Data Analytics console, I create a new application and select “Flink” as runtime:

I then configure the application to use the code on my S3 bucket. The console updates the IAM role for the application to have permissions to read the code.

You can optionally add key/value properties to the configuration of the application. You can read those properties from within the application, to provide customization at deployment time.

For monitoring, I leave the default metrics. I enable logging to Amazon CloudWatch, for errors only.

Don’t forget to add permissions to the IAM role created by the console to allow the Kinesis Analytics application to read and write from the streams used for input and output, TextInputStream and WordCountOutputStream in my case.

I can now start the application with the “Run” button, and when it is running, I use a script that I prepared to put some text (I am using a description of the Amazon Kinesis platform) in the input stream:

$ python put_records.py TextInputStream
Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data...

The behavior of my application is summarized in the console in the Application Graph, a visual representation of the data flow consisting of operators and intermediate results (complex applications, using multiple streams, have a much more interesting graph):

To read the output stream, I am using a Lambda function written in Python. I am using the one provided with the Kinesis Record Aggregation & Deaggregation Modules for AWS Lambda, that provides automatic “de-aggregation” of records aggregated by the Amazon Kinesis Producer Library (KPL).

As expected, in the CloudWatch Logs console I get the list of the words and the number of times they were used, updated every 5 seconds by the Lambda function:

Pricing and Availability

With Amazon Kinesis Data Analytics for Java, you pay only for what you use. Pricing is similar to Amazon Kinesis Data Analytics for SQL, but there are a few differences.

For Java applications, you are charged a single additional Amazon Kinesis Processing Unit (KPU) per application, used for application orchestration. Java applications are also charged for running application storage and durable application backups. Running application storage is used for Amazon Kinesis Data Analytics’ stateful processing capabilities and is charged per GB-month. Durable application backups are optional and provide a point-in-time recovery point for applications, charged per GB-month.

For example, pricing is $0.11 per KPU hour in US East (N. Virginia), and you are charged for running application storage ($0.10 per GB-month) and durable application backups ($0.023 per GB-month).

Available Now

Amazon Kinesis Data Analytics for Java is available now in US East (N. Virginia), US East (Ohio), US West (Oregon), EU West (Ireland).

I only scratched the surface of the capabilities for stream processing enabled by the support of Java in Amazon Kinesis Data Analytics. I think this is a powerful tool that can enable new use cases. Let me know what you are going to build with it!

Scaling Amazon Kinesis Data Streams with AWS Application Auto Scaling

Post Syndicated from Giorgio Nobile original https://aws.amazon.com/blogs/big-data/scaling-amazon-kinesis-data-streams-with-aws-application-auto-scaling/

Recently, AWS launched a new feature of AWS Application Auto Scaling that let you define scaling policies that automatically add and remove shards to an Amazon Kinesis Data Stream. For more detailed information about this feature, see the Application Auto Scaling GitHub repository.

As your streaming information increases, you require a scaling solution to accommodate all requests. If you have a decrease in streaming information, you might use scaling to reduce costs. Currently, you scale an Amazon Kinesis Data Stream shard programmatically. Alternatively, you can use the Amazon Kinesis Scaling Utilities. To do so, you can use each utility manually, or automated with an AWS Elastic Beanstalk environment.

With the new feature of Application Auto Scaling, you can use AWS services to create a scaling solution without manual intervention or complex solutions.

Auto scaling solution overview

This blog post shows you how to deploy an auto scaling solution for your Amazon Kinesis Data Streams based on the default Amazon CloudWatch metrics. It also provides an AWS CloudFormation template to set up the environment automatically and the code related to the lambda function.

How the auto scaling solution works

Begin with a CloudWatch alarm that monitors Kinesis Data Stream shard metrics. When a custom threshold of the alarm is reached, for example because the number of requests has grown, the alarm is fired. This firing sends a notification to an Application Auto Scaling policy that responds based on the stated preference, scale up or down.

When the scaling policy is triggered, Application Auto Scaling calls an API operation. The call passes the new number of Kinesis Data Stream shards for the desired capacity (for more information, see here). The call also passes the name of the resource to scale, provided by Amazon API Gateway. Amazon API Gateway invokes an AWS Lambda function. Based on the information sent by Application Auto Scaling, the Lambda function increases or decreases the number of shards in the Kinesis Data Stream. It does so by using Kinesis Data Stream’s UpdateShardCount API operation. The following diagram illustrates the scenario.

As you can see from the diagram, AWS System Manager Parameter Store is also involved. We use Parameter Store to store the desired capacity value that Application Auto Scaling sends to API Gateway to increase or decrease the capacity. (In this scenario, the capacity is the number of shards.) In fact, Application Auto Scaling often invokes API Gateway to get the status of the custom resource, in this case the Kinesis Data Stream. It does so to see if there are actions to be taken and if previous actions were successful. Because Lambda is stateless, we need somewhere to save the desired capacity value communicated by Application Auto Scaling at any point.

Solution components

This solution uses the following components:

Application Auto Scaling scalable target – A scalable target is a resource registered with the Application Auto Scaling service. The service can scale any defined and registered resources. A scalable target handles the minimum and maximum value for the scalable dimension. It requires the following parameters:

  • ResourceId: The resource that is the scalable target. For custom resources, such as in the following example, specify the OutputValue returned from the AWS CloudFormation template.
  • RoleARN: The service-linked role used to grant permission to modify scalable target resources.
  • ScalableDimension: The dimension of the scalable target. For custom resources, the value must be custom-resource:ResourceType:Property.
  • ServiceNamespace: The namespace of the AWS service. In this case, this value is the custom resource.

Scaling policy – After you register a scalable target, you can apply a scaling policy that describes how the service should scale.

The following policy types are supported:

  • TargetTrackingScaling — Only for Amazon DynamoDB
  • StepScaling — Supported by Amazon ECS, Amazon EC2 Spot Fleets, and Amazon RDS
  • TargetTrackingScaling — Supported by Amazon ECS, EC2 Spot Fleets, and Amazon RDS
  • StepScaling — Supported by other services

In our scenario, we use a StepScaling policy, because we are using a custom resource type, as discussed later in Scaling policy and scheduled actions section. However, custom resource type can also support scheduled actions.

API Gateway – In our solution, we use Amazon API Gateway to expose a secure REST endpoint. Application Auto Scaling uses this endpoint to send authenticated calls, using IAM, to get the current capacity of the custom service to scale with HTTP GET. Application Auto Scaling also uses this endpoint to adjust the relative capacity of the custom service (with HTTP PATCH).

CloudWatch metrics and alarms – KPI to monitor and trigger an alarm directed to the Application Auto Scaling endpoint.

Lambda function – In our scenario, the AWS Lambda function mainly does two tasks:

  1. If the API request is GET, the Lambda function returns JSON that includes the information of the status of the custom resource that Application Auto Scaling controls. In this case, this custom resource is the Kinesis Data Stream.
  2. If the API request is PATCH, the Lambda function stores the new desired capacity in a DynamoDB table. The Lambda function then calls the UpdateShardCount API operation for the Kinesis Data Stream.

AWS System Manager Parameter Store – KPI to monitor and trigger an alarm directed to the Application Auto Scaling endpoint.


Prerequisites for this solution include the following:

  • User credentials with permissions that allow you to configure automatic scaling and create the required service-linked role. For more information, see the Application Auto Scaling User Guide.
  • Permissions to create a stack using an AWS CloudFormation template, plus full access permissions to resources within the stack. For more information, see the AWS CloudFormation User Guide.

Scaling policy and scheduled actionsseconds

You can use the same architecture to work in two different situations for your Amazon Kinesis Data Stream:

  1. The first is predictable traffic, which means the scheduled actions. An example of predictable traffic is when your Kinesis Data Stream endpoint sees growing traffic in specific time window. In this case, you can make sure that an Application Auto Scaling scheduled action increases the number of Kinesis Data Stream shards to meet the demand. For instance, you might increase the number of shards at 12:00 p.m. and decrease them at 8:00 p.m.
  2. The second is the classic on-demand scenario, which specifies the scaling policy. In this case, you create an Application Auto Scaling scaling policy that increases or decreases the number of Kinesis Data Stream shards to meet the client demand.

In this blog post we are going to focus on the seconds scenario with the scaling policy, as we believe it is more challenging to implement.


Application Auto Scaling can scale up and down continuously to make sure that you can meet your demand. However, Kinesis Data Streams have some limitations to consider when configuring Application Auto Scaling. With Kinesis Data Streams, you can’t do the following:

  • Scale more than twice for each rolling 24-hour period for each stream
  • Scale up to more than double your current shard count for a stream
  • Scale down below half your current shard count for a stream
  • Scale up to more than 500 shards in a stream
  • Scale a stream with more than 500 shards down unless the result is fewer than 500 shards
  • Scale up to more than the shard limit for your account

If you need to scale more than once a day, you can use this AWS Support form to request an increase to this limit.

Choosing the metric

When choosing the metrics to monitor to scale up and down, we can use the stream-level metrics IncomingBytes and IncomingRecords, as described in the Kinesis Data Streams documentation. Kinesis supports streaming 1 MiB of data per second or 1000 records per second. We can use IncomingBytes and IncomingRecords to set an alarm based on a threshold, let’s say 80 percent. We do this to call the Application Auto Scaling service before Amazon Kinesis start throttling our requests. This is the most effective method to proactively scale our resource. However, we need to set up the right cooldown period in Application Auto Scaling to avoid multiple scaling actions triggered by both metrics at the same time.

Alternatively, we can use the WriteProvisionedThroughputExceeded metric to scale when we reach the Amazon Kinesis shard limit, as described in the CloudWatch documentation.

In this example, we use the first approach, using IncomingRecords.

Deploying and testing the solution

To test the solution, we can use the AWS CloudFormation template found here. The AWS CloudFormation template automatically creates for you: the API Gateway, the Lambda function, the Kinesis Data Stream, the DynamoDB table, and the Application Auto Scaling group, and its scaling policy.

Deploying the solution

To let AWS CloudFormation create these resources on your behalf:

  1. Open the AWS Management Console in the AWS Region you want to deploy the solution to, and on the Services menu, choose CloudFormation.
  2. Choose Create Stack, choose Upload a template to Amazon S3, and then choose the file custom-application-autoscaling-kinesis.yaml included in the solution.
  3. Give a friendly name to the stack. Specify the Amazon S3 bucket that contains the compressed version of AWS Lambda function (index.py) included in the solution.
  4. For Options, you can specify tags for your stack and an optional IAM role to be used by AWS CloudFormation to create resources. If the role isn’t specified, a new role is created. You can also perform additional configuration for rollback settings and notification options.
  5. The review section shows a recap of the information. Be sure to select the two AWS CloudFormation acknowledgements to allow AWS CloudFormation to create resources with custom names on your behalf. Also, create a change set, because the AWS CloudFormation template includes the AWS::Serverless-2016-10-31
  6. Choose stream level metrics to create the resources present in the stack.

Testing the solution

Now that the environment is created, test it. To manually fire the Amazon CloudWatch alarm, we must generate traffic to the stream. By taking advantage of the Amazon Kinesis Data Generator, this is an efficient way to do it.

  1. First, it is necessary to follow this guide to set up your Amazon Kinesis Data Generator https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html
  2. After the generator is created, it is necessary to select the Region and the newly created Kinesis Data Stream, in our case Kinesis-MyKinesisStream-1MUOGAD9OBCJH
  3. In Records per second insert a value greater than 1000 if you have one shard. Otherwise, multiply this number time the number of shards (for instance, if you have two shards, 1500 * 2 = 3000).
  4. In the form, enter test, and then choose Send data.
  5. Now that the traffic is being generated, open the Amazon CloudWatch console, and in Alarms, choose Alarms.
  6. In the ALARM list, select IncomingRecords-alarm-outOpen the History tab on the bottom of the page to see that the alarm triggered the Application Auto Scaling.

To verify that the number of open shards has been updated:

  1. Open the Amazon Kinesis console and select Data Streams, then select your Data Stream, in our case Kinesis-MyKinesisStream-1MUOGAD9OBCJH.
  2. In Details, it is possible to see that the number of shards increased to three, as shown in the following example:

Cleaning up the environment after testing

To clean up the environment after the testing, the procedure is straight-forward. By removing the AWS CloudFormation stack, everything is removed, as follows:

  1. Open the AWS Management Console in the AWS Region that you want to deploy the solution to, and select the CloudFormation stack from the list.
  2. Click on Actions and Delete Stack.
  3. OPTIONALLY: you can delete the S3 bucket and the Lambda function that you created.


This post described how you use Application Auto Scaling service to automatically scale Amazon Kinesis Data Stream. With the help of Amazon API Gateway, you can allow Application Auto Scaling to securely invoke the AWS Lambda function that interacts with the desired stream.

About the Authors

Giorgio Nobile works as Solutions Architect for Amazon Web Services in Italy. He works with enterprise customers and helps them to embrace the digital transformation. Giorgio’s field of expertise covers Big Data. In his free time, Giorgio loves playing with his two children and is addicted to DIY and snowboarding.




Diego Natali works as Solutions Architect for Amazon Web Services in Italy. With several years engineering background, he helps ISV and Start up customers designing flexible and resilient architectures using AWS services. In his spare time he enjoys watching movies and riding his dirt bike.





Collect, parse, transform, and stream Windows events, logs, and metrics using Amazon Kinesis Agent for Microsoft Windows

Post Syndicated from Harvir Singh original https://aws.amazon.com/blogs/big-data/collect-parse-transform-and-stream-windows-events-logs-and-metrics-using-amazon-kinesis-agent-for-microsoft-windows/

A complete data pipeline that includes Amazon Kinesis Agent for Microsoft Windows (KA4W) can help you analyze and monitor the performance, security, and availability of Windows-based services. You can build near-real-time dashboards and alarms for your Windows services. You can also use visualization and business intelligence tools such as Amazon Athena, Kibana, Amazon QuickSight, and Amazon CloudWatch to rapidly locate, diagnose, and resolve operational and security issues.

KA4W eliminates cloud-based log processing by parsing and transforming logs into standard formats such as JSON. These formats can then be immediately consumed by the visualization and business intelligence tools in the data pipeline.

Here are a few words from one of our customers summarizing their experience with KA4W:

“The new Amazon Kinesis Agent for Microsoft Windows has simplified our workflow for streaming logs by eliminating complicated orchestration between multiple interconnected systems. The agent was easy to set up, configure, update, and most importantly performs significantly better. In all, the Amazon Kinesis Agent for Microsoft Windows has the potential to markedly improve visibility of issues in our environment and reduce operational cost.”– Sanjay Kumar, senior software engineer at Autodesk Inc.

In this post, we review how the new Kinesis Agent for Windows enables streaming analytics use cases related to Windows applications, servers, and workstations. We also show you how to get started with the new agent. By using KA4W to push real-time data into Amazon Kinesis services, you can solve a wide range of operational issues, including the following:

  • Monitoring of Dynamic Host Configuration Protocol (DHCP) servers for identifying IP lease refusals in case of exhausted scopes
  • Monitoring of Microsoft Exchange servers for identifying top email senders, mail-storm situations, and heavy load conditions
  • Monitoring of web-based application and Internet Information Services (IIS) logs for performance, availability, and security issues
  • Monitoring of domain controllers for Active Directory and security issues
  • Enabling security intelligence platforms to ingest log files for forensics and penetration testing

Overview of Kinesis Agent for Windows

Amazon Kinesis Agent for Microsoft Windows (KA4W) is a configurable and extensible agent. It runs on Windows laptops, desktop computers, and servers, either on-premises or in the AWS Cloud. KA4W efficiently and reliably gathers, parses, transforms, and streams logs, events, and metrics to various AWS services, including Kinesis Data Streams, Kinesis Data Firehose, CloudWatch, and CloudWatch Logs.

Kinesis Agent for Windows provides built-in parsers that simplify the processing of logs from common Windows services such as Exchange, Active Directory, DHCP, Microsoft SharePoint, and Windows security logs. It solves many technical and operational challenges for streaming logs, events, and metrics to AWS services, including the following:

  • Handling large volume of log files, processing and transforming logs and events from many different sources and formats in near-real time
  • Handling different kinds of log rotation approaches and accessing log files even when those logs files are locked by log writers
  • Reducing data transfer and storage costs by filtering unnecessary data before delivering to AWS services
  • Adding context to the collected data that enables precise analysis and rapid resolution of operational and security issues
  • Providing data about the health of the agent itself, which confirms the accuracy and completeness of the data collected and streamed

The following diagram illustrates some of the ways you can build custom, real-time data pipelines by using Kinesis Agent for Windows and stream-processing frameworks.

Amazon Kinesis Agent for Microsoft Windows includes an array of plugins. By configuring these plugins, you can customize KA4W to satisfy most requirements for the collection, transformation, and near-real-time delivery of logs, events, and metrics. You can even create your own plugin if you have custom requirements. Plugins are categorized by sources, pipes, and sinks.

Sources are the plugins that gather various Windows logs, events, and metrics. KA4W comes with multiple built-in source plugins, including the following:

  • DirectorySource
  • ExchangeLogSource
  • 3SVCLogSource
  • UlsSource (SharePoint)
  • WindowsEventLogSource
  • WindowsETWEventSource
  • WindowsPerformanceCounterSource

For more information about these sources, see Source Declarations in the Kinesis Agent for Windows User Guide.

Pipes connect sources and sinks. You can use pipes to filter unnecessary data to improve data transfer and reduce storage and API usage cost. Filters improve data quality and provide an effective way to restrict the amount of data for analysis. For details on configuring pipes, see Pipe Declarations in the Kinesis Agent for Windows User Guide.

Sinks are the plugins that stream logs, event, and metrics data to different AWS services. Kinesis Agent for Windows comes with multiple built-in sink plugins, such as KinesisStream, KinesisFirehose, CloudWatch, and CloudWatchLogs. Kinesis Agent for Windows needs to authenticate with AWS services to send data. For details on sinks and security configuration, see Sink Declarations in the Kinesis Agent for Windows User Guide.

Monitoring a web server example

Suppose that you have a web-based application and want to monitor the underlying web server. To do so, follow these steps to build the data pipeline that is required to move Windows events and web server logs to Kinesis Data Firehose and Amazon S3 for analytics:

  1. Install Kinesis Agent for Windows
  2. Create the streams
  3. Set the permissions
  4. Configure Kinesis Agent for Windows
  5. Start Kinesis Agent for Windows
  6. View the ingested logs

Install Kinesis Agent for Windows

Go to the Amazon Kinesis Agent for Microsoft Windows download page, and follow the instructions to download the agent. For example, you can run the following command in an elevated PowerShell command prompt:

Invoke-Expression ((New-Object System.Net.WebClient).DownloadString('https://s3-us-west-2.amazonaws.com/kinesis-agent-windows/downloads/InstallKinesisTap.ps1'))

Create streams

Create two Kinesis Data Firehose delivery streams named EventLogStream and W3SVCLogStream. Configure both streams to deliver data to Amazon Simple Storage Service (Amazon S3). To expedite log delivery to Amazon S3, reduce the delivery stream buffer size to 1 MB and the buffer interval to 60 seconds. This results in more frequent writes to Amazon S3.

Set permissions

If your host is an Amazon EC2 instance, the security configuration is simpler, and you only need to grant your Amazon EC2 instance role PutRecordBatch permission to the streams created. A sample IAM (security) policy looks like the following:

    "Version": "2012-10-17",
    "Statement": [
            "Effect": "Allow",
            "Action": [
            "Resource": [

Configure Kinesis Agent for Windows

For this example, replace the content of “C:\Program Files\Amazon\AWSKinesisTap\appsettings.json” with the following:

    "Sources": [
            "Id": "ApplicationLog",
            "SourceType": "WindowsEventLogSource",
            "LogName": "Application"
            "Id": "SecurityLog",
            "SourceType": "WindowsEventLogSource",
            "LogName": "Security"
            "Id": "SystemLog",
            "SourceType": "WindowsEventLogSource",
            "LogName": "System"
            "Id": "W3SVCLog1",
            "SourceType": "W3SVCLogSource",
            "Directory": "C:\\inetpub\\logs\\LogFiles\\W3SVC1",
            "FileNameFilter": "*.log",
            "TimeZoneKind": "UTC"
    "Sinks": [
            "Id": "EventLogSink",
            "SinkType": "KinesisFirehose",
            "StreamName": "EventLogStream",
            "Format": "json"
            "Id": "W3SVCLogSink",
            "SinkType": "KinesisFirehose",
            "Region": "us-west-2",
            "StreamName": "W3SVCLogStream",
            "Format": "json"
    "Pipes": [
            "Id": "ApplicationLogToFirehose",
            "SourceRef": "ApplicationLog",
            "SinkRef": "EventLogSink"
            "Id": "SecurityLogToFirehose",
            "SourceRef": "SecurityLog",
            "SinkRef": "EventLogSink"
            "Id": "SystemLogToFirehose",
            "SourceRef": "SystemLog",
            "SinkRef": "EventLogSink"
            "Id": "W3SVCLog1ToKinesisStream",
            "SourceRef": "W3SVCLog1",
            "SinkRef": "W3SVCLogSink"

In the preceding configuration file, there are four sources. The first three are for different Windows Event Logs, and the last one is for IIS (W3SVC) logs. Typically IIS writes logs to the C:\inetpub\logs\LogFiles\W3SVC1 directory.

There are two KinesisFirehose sinks: EventLogSink sends data to EventLogStream, and W3SVCLogSink sends data to W3SVCLogStream. For both sinks, you set the Format key-value pair to json. This instructs the agent to send logs in JSON-formatted data to Kinesis Data Firehose.

The first three pipes connect the first three sources to the EventLogSink, and the last pipe connects the last source to the W3SVCLogSink.

In this example, authentication and authorization are based on the instance profile that is associated with the Amazon EC2 instance that is running Kinesis Agent for Windows. There are other approaches to authentication and authorization. For more information, see “Sink Security Configuration” in Sink Declarations in the Kinesis Agent for Windows User Guide.

Start Kinesis Agent for Windows

Note: During the development of Kinesis Agent for Windows, the internal name was “AWSKinesisTap”. To maintain backward compatibility, we have maintained this terminology within the agent’s configuration.

There are a few ways to start the agent:

  1. Start the agent from the services applet.
  2. Start the agent from an elevated command prompt and run net start AWSKinesisTap.
  3. Start the agent from an elevated PowerShell command prompt and run Start-Service -Name AWSKinesisTap.

If Kinesis Agent for Windows does not start, check the application event log. If the agent starts, you can find the logs in “C:\Program Data\Amazon\AWSKinesisTap\logs”.

If you run into difficulty, see Troubleshooting in the Kinesis Agent for Windows User Guide.

View the log data

Here is a quick and easy way to view the ingested data in Amazon S3:

  1. Sign in to the AWS Management Console, and open the Amazon S3 console. Go to the S3 bucket that the Kinesis Data Firehose delivery streams are streaming to, and choose an object.
  2. Choose the Select from
  3. Under File format, choose JSON, and then choose Show file preview.

The following shows an example for a Windows Event Log:

The Amazon Kinesis Agent for Microsoft Windows User Guide contains a tutorial that demonstrates a powerful scenario using Amazon Athena to query the data in Amazon S3.


Amazon Kinesis Agent for Microsoft Windows is free to use. However, you pay for the AWS resources that interact with your complete data pipeline, such as Kinesis Data Streams, Kinesis Data Firehose, and Amazon S3.


Amazon Kinesis Agent for Microsoft Windows consumes a minimal amount of system resources. Memory and CPU usage can vary based on the amount of data streaming to AWS services and the virtual or physical hardware configuration of the machines hosting Kinesis Agent for Windows.


Amazon Kinesis Agent for Microsoft Windows streams logs, events, and metrics to AWS services. It is a key part of constructing an efficient, reliable, and cost-effective data pipeline for discovering, preventing, and resolving complex operational and security issues with Windows desktop and server machines. The example in this post demonstrates the simplicity of configuring a custom-tailored solution for gathering and streaming operational data from a web server host. Kinesis Agent for Windows allows you to choose the right AWS services for your scenarios to construct data pipelines and gain deeper insight into your operational challenges.

Going further

Kinesis Agent for Windows is very flexible and has many additional features. Here are a few suggestions for next steps:

  • Review the many configuration examples in the Kinesis Agent for Windows User Guide.
  • Learn about DirectorySource, a flexible source plugin that parses many types of text-based logs stored in the Windows file system. For details on supported parsers, see Source Declarations.
  • View the configuration file, which has variables support to allow the agent to get information from the environment variables and Amazon EC2 metadata.
  • Decorate the logs using TextDecoration and ObjectDecoration.
  • Configure Kinesis Agent for Windows to update itself and its configuration file. For more information, see Configuring Automatic Updates.

Additional resources

About the authors

Harvir Singh is a Software Development Manager for Amazon Kinesis Agent for Microsoft Windows.





Li Chen is a Senior Software Development Engineer for Amazon Kinesis Agent for Microsoft Windows.





Bonnie Feinberg is a Senior Software Development Engineer for Amazon Kinesis Agent for Microsoft Windows.

Exciting New Email Features Available Now in Amazon Pinpoint

Post Syndicated from Brent Meyer original https://aws.amazon.com/blogs/messaging-and-targeting/exciting-new-email-features-available-now-in-amazon-pinpoint/

Yesterday, we launched several exciting new email-related features in Amazon Pinpoint. The biggest of these features is the ability to send transactional emails. Transactional emails are a great way to send one-to-one emails to your customers without creating segments or campaigns first. For example, when a customer makes a purchase in your app, or requests a password reset, you can send them an email immediately and directly with the information that they need. You can send transactional emails by using the Amazon Pinpoint API or an AWS SDK, or by using the Amazon Pinpoint SMTP interface.

Along with the ability to send transactional emails, we also added a dashboard that gives you deep insights into the performance of the emails you’ve sent. You can use this dashboard to view several response metrics related to the transactional emails you’ve sent, including your delivery, bounce, and complaint rates, the number of emails that were opened and clicked, and more. You can filter this dashboard to include data from 1 to 30 days.

We’ve also added the ability to lease dedicated IP addresses through Amazon Pinpoint. Dedicated IP addresses are IP addresses that are reserved for your exclusive use. Dedicated IP addresses give you complete control over the reputations of the IP addresses that you use to send email through Amazon Pinpoint. When you use dedicated IP addresses, you can create pools of your dedicated IP addresses. Dedicated IP pools are groups of IP addresses you use for different purposes. For example, you can create a pool of addresses to use when sending transactional messages, and another pool for sending marketing communications.

We also added the following email-related capabilities:

  • Content personalization: Generic, one-size-fits-all emails tend to have lower engagement rates than personalized messages. When you send a transactional email, you can include personalization tags, such as the recipient’s name or location. When Amazon Pinpoint sends the mail, it replaces the personalization tags with the appropriate values for the recipient.
  • Event destinations: You can use event destinations to send information about email events to various destinations, including Amazon CloudWatch, Amazon SNS, and Amazon Kinesis Data Firehose. Email events include email opens, clicks, bounces, complaints, and rejections.
  • Raw email support: You can use the Amazon Pinpoint API or SMTP interface to send raw, MIME-formatted emails. MIME emails can include custom headers and file attachments.

These features are available now in all AWS Regions where Amazon Pinpoint is available. We hope that these features help you find exciting new ways to use Amazon Pinpoint to connect and engage with your customers.

Learn about AWS – November AWS Online Tech Talks

Post Syndicated from Robin Park original https://aws.amazon.com/blogs/aws/learn-about-aws-november-aws-online-tech-talks/

AWS Tech Talks

AWS Online Tech Talks are live, online presentations that cover a broad range of topics at varying technical levels. Join us this month to learn about AWS services and solutions. We’ll have experts online to help answer any questions you may have.

Featured this month! Check out the tech talks: Virtual Hands-On Workshop: Amazon Elasticsearch Service – Analyze Your CloudTrail Logs, AWS re:Invent: Know Before You Go and AWS Office Hours: Amazon GuardDuty Tips and Tricks.

Register today!

Note – All sessions are free and in Pacific Time.

Tech talks this month:


November 13, 2018 | 11:00 AM – 12:00 PM PTHow to Create a Chatbot Using Amazon Sumerian and Sumerian Hosts – Learn how to quickly and easily create a chatbot using Amazon Sumerian & Sumerian Hosts.


November 19, 2018 | 11:00 AM – 12:00 PM PTUsing Amazon Lightsail to Create a Database – Learn how to set up a database on your Amazon Lightsail instance for your applications or stand-alone websites.

November 21, 2018 | 09:00 AM – 10:00 AM PTSave up to 90% on CI/CD Workloads with Amazon EC2 Spot Instances – Learn how to automatically scale a fleet of Spot Instances with Jenkins and EC2 Spot Plug-In.


November 13, 2018 | 09:00 AM – 10:00 AM PTCustomer Showcase: How Portal Finance Scaled Their Containerized Application Seamlessly with AWS Fargate – Learn how to scale your containerized applications without managing servers and cluster, using AWS Fargate.

November 14, 2018 | 11:00 AM – 12:00 PM PTCustomer Showcase: How 99designs Used AWS Fargate and Datadog to Manage their Containerized Application – Learn how 99designs scales their containerized applications using AWS Fargate.

November 21, 2018 | 11:00 AM – 12:00 PM PTMonitor the World: Meaningful Metrics for Containerized Apps and Clusters – Learn about metrics and tools you need to monitor your Kubernetes applications on AWS.

Data Lakes & Analytics

November 12, 2018 | 01:00 PM – 01:45 PM PTSearch Your DynamoDB Data with Amazon Elasticsearch Service – Learn the joint power of Amazon Elasticsearch Service and DynamoDB and how to set up your DynamoDB tables and streams to replicate your data to Amazon Elasticsearch Service.

November 13, 2018 | 01:00 PM – 01:45 PM PTVirtual Hands-On Workshop: Amazon Elasticsearch Service – Analyze Your CloudTrail Logs – Get hands-on experience and learn how to ingest and analyze CloudTrail logs using Amazon Elasticsearch Service.

November 14, 2018 | 01:00 PM – 01:45 PM PTBest Practices for Migrating Big Data Workloads to AWS – Learn how to migrate analytics, data processing (ETL), and data science workloads running on Apache Hadoop, Spark, and data warehouse appliances from on-premises deployments to AWS.

November 15, 2018 | 11:00 AM – 11:45 AM PTBest Practices for Scaling Amazon Redshift – Learn about the most common scalability pain points with analytics platforms and see how Amazon Redshift can quickly scale to fulfill growing analytical needs and data volume.


November 12, 2018 | 11:00 AM – 11:45 AM PTModernize your SQL Server 2008/R2 Databases with AWS Database Services – As end of extended Support for SQL Server 2008/ R2 nears, learn how AWS’s portfolio of fully managed, cost effective databases, and easy-to-use migration tools can help.


November 16, 2018 | 09:00 AM – 09:45 AM PTBuild and Orchestrate Serverless Applications on AWS with PowerShell – Learn how to build and orchestrate serverless applications on AWS with AWS Lambda and PowerShell.

End-User Computing

November 19, 2018 | 01:00 PM – 02:00 PM PTWork Without Workstations with AppStream 2.0 – Learn how to work without workstations and accelerate your engineering workflows using AppStream 2.0.

Enterprise & Hybrid

November 19, 2018 | 09:00 AM – 10:00 AM PTEnterprise DevOps: New Patterns of Efficiency – Learn how to implement “Enterprise DevOps” in your organization through building a culture of inclusion, common sense, and continuous improvement.

November 20, 2018 | 11:00 AM – 11:45 AM PTAre Your Workloads Well-Architected? – Learn how to measure and improve your workloads with AWS Well-Architected best practices.


November 16, 2018 | 01:00 PM – 02:00 PM PTPushing Intelligence to the Edge in Industrial Applications – Learn how GE uses AWS IoT for industrial use cases, including 3D printing and aviation.

Machine Learning

November 12, 2018 | 09:00 AM – 09:45 AM PTAutomate for Efficiency with Amazon Transcribe and Amazon Translate – Learn how you can increase efficiency and reach of your operations with Amazon Translate and Amazon Transcribe.


November 20, 2018 | 01:00 PM – 02:00 PM PTGraphQL Deep Dive – Designing Schemas and Automating Deployment – Get an overview of the basics of how GraphQL works and dive into different schema designs, best practices, and considerations for providing data to your applications in production.


November 9, 2018 | 08:00 AM – 08:30 AM PTEpisode 7: Getting Around the re:Invent Campus – Learn how to efficiently get around the re:Invent campus using our new mobile app technology. Make sure you arrive on time and never miss a session.

November 14, 2018 | 08:00 AM – 08:30 AM PTEpisode 8: Know Before You Go – Learn about all final details you need to know before you arrive in Las Vegas for AWS re:Invent!

Security, Identity & Compliance

November 16, 2018 | 11:00 AM – 12:00 PM PTAWS Office Hours: Amazon GuardDuty Tips and Tricks – Join us for office hours and get the latest tips and tricks for Amazon GuardDuty from AWS Security experts.


November 14, 2018 | 09:00 AM – 10:00 AM PTServerless Workflows for the Enterprise – Learn how to seamlessly build and deploy serverless applications across multiple teams in large organizations.


November 15, 2018 | 01:00 PM – 01:45 PM PTMove From Tape Backups to AWS in 30 Minutes – Learn how to switch to cloud backups easily with AWS Storage Gateway.

November 20, 2018 | 09:00 AM – 10:00 AM PTDeep Dive on Amazon S3 Security and Management – Amazon S3 provides some of the most enhanced data security features available in the cloud today, including access controls, encryption, security monitoring, remediation, and security standards and compliance certifications.

Store, Protect, Optimize Your Healthcare Data with AWS: Part 2

Post Syndicated from Stephen Jepsen original https://aws.amazon.com/blogs/architecture/store-protect-optimize-your-healthcare-data-with-aws-part-2/

Leveraging Analytics and Machine Learning Tools for Readmissions Prediction

This blog post was co-authored by Ujjwal Ratan, a senior AI/ML solutions architect on the global life sciences team.

In Part 1, we looked at various options to ingest and store sensitive healthcare data using AWS. The post described our shared responsibility model and provided a reference architecture that healthcare organizations could use as a foundation to build a robust platform on AWS to store and protect their sensitive data, including protected health information (PHI). In Part 2, we will dive deeper into how customers can optimize their healthcare datasets for analytics and machine learning (ML) to address clinical and operational challenges.

There are a number of factors creating pressures for healthcare organizations, both providers and payers, to adopt analytic tools to better understand their data: regulatory requirements, changing reimbursement models from volume- to value-based care, population health management for risk-bearing organizations, and movement toward personalized medicine. As organizations deploy new solutions to address these areas, the availability of large and complex datasets from electronic health records, genomics, images (for example, CAT, PET, MRI, ultrasound, X-ray), and IoT has been increasing. With these data assets growing in size, healthcare organizations want to leverage analytic and ML tools to derive new actionable insights across their departments.

One example of the use of ML in healthcare is diagnostic image analysis, including digital pathology. Pathology is extremely important in diagnosing and treating patients, but it is also extremely time-consuming and largely a manual process. While the complexity and quantity of workloads are increasing, the number of pathologists is decreasing. According to one study, the number of active pathologists could drop by 30 percent by 2030 compared to 2010 levels. (1) A cloud architecture and solution can automate part of the workflow, including sample management, analysis, storing, sharing, and comparison with previous samples to complement existing provider workflows effectively. A recent study using deep learning to analyze metastatic breast cancer tissue samples resulted in an approximately 85% reduction in human error rate. (2)

ML is also being used to assist radiologists in examining other diagnostic images such as X-rays, MRIs, and CAT scans. Having large quantities of images and metadata to train the algorithms that are the key to ML is one of the main challenges for ML adoption. To help address this problem, the National Institutes of Health recently released 90,000 X-ray plates tagged either with one of 14 diseases or tagged as being normal. Leading academic medical centers are using these images to build their neural networks and train their algorithms. With advanced analytics and ML, we can answer the hard questions such as “what is the next best action for my patient, the expected outcome, and the cost.”

The foundations for a great analytical layer

Let’s pick up from where we left off in Part 1. We have seen how providers can ingest data into AWS from their data centers and store it securely into different services depending on the type of data. For example:

  1. All object data is stored in Amazon S3, Amazon S3 Infrequent Access, or Amazon Glacier depending on how often they are used.
  2. Data from the provider’s database is either processed and stored as objects in Amazon S3 or aggregated into data marts on Amazon Redshift.
  3. Metadata of the objects on Amazon S3 are maintained in the DynamoDB database.
  4. Amazon Athena is used to query the objects directly stored on Amazon S3 to address ad hoc requirements.

We will now look at two best practices that are key to building a robust analytical layer using these datasets.

  1. Separating storage and compute: You should not be compelled to scale compute resources just to store more data. The scaling rules of the two layers should be separate.
  2. Leverage the vast array of AWS big data services when it comes to building the analytical platforms instead of concentrating on just a few of them. Remember, one size does not fit all.

Technical overview

In this overview, we will demonstrate how we can leverage AWS big data and ML services to build a scalable analytical layer for our healthcare data. We will use a single source of data stored in Amazon S3 for performing ad hoc analysis using Amazon Athena, integrate it with a data warehouse on Amazon Redshift, build a visual dashboard for some metrics using Amazon QuickSight, and finally build a ML model to predict readmissions using Amazon SageMaker. By not moving the data around and just connecting to it using different services, we avoid building redundant copies of the same data. There are multiple advantages to this approach:

  1. We optimize our storage. Not having redundant copies reduces the amount of storage required.
  2. We keep the data secure with only authorized services having access to it. Keeping multiple copies of the data can result in higher security risk.
  3. We are able to scale the storage and compute separately as needed.
  4. It becomes easier to manage the data and monitor usage metrics centrally such as how often the data has been accessed, who has been accessing it, and what has been the growth pattern of the data over a period of time. These metrics can be difficult to aggregate if the data is duplicated multiple times.

Let’s build out this architecture using the following steps:

  1. Create a database in AWS Glue Data Catalog

We will do this using a Glue crawler. First create a JSON file that contains the parameters for the Glue crawler.

"Name": "readmissions",
"Role": "arn of the role for Glue",
"DatabaseName": "readmissions",
"Description": "glue data catalog for storing readmission data",
"Targets": {
"S3Targets": [
"Path": "s3://<bucket>/<prefix>"
"Path": "s3://<bucket>/<prefix>"

As you can see, the crawler will crawl two locations in Amazon S3 and save the resulting tables in a new database called “readmissions.” Replace the role ARN and Amazon S3 locations with your corresponding details. Save this in a file create_crawler.json. Then from the AWS CLI, call the following command to create the crawler:

aws glue create-crawler --cli-input-json file://create_crawler.json

Once the crawler is created, run it by calling the following command:

aws glue start-crawler --name readmissions

Log on to the AWS Glue console, navigate to the crawlers, and wait until the crawler completes running.

This will create two tables — phi and non-phi — in a database named “readmissions” in the AWS Glue Data Catalog as shown below.

  1. Query the data using Athena

The Amazon Glue Data Catalog is seamlessly integrated with Amazon Athena. For details on how to enable this, see Integration with AWS Glue.

As a result of this integration, the tables created using the Glue crawler can now be queried using Amazon Athena. Amazon Athena allows you to do ad hoc analysis on the dataset. You can do exploratory analysis on the data and also determine its structure and quality. This type of upfront ad hoc analysis is invaluable for ensuring the data quality in your downstream data warehouse or your ML algorithms that will make use of this data for training models. In the next few sections, we will explore these aspects in greater detail.

To query the data using Amazon Athena, navigate to the Amazon Athena console.

NOTE: Make sure the region is the same as the region you chose in the previous step. If it’s not the same, switch the region by using the drop-down menu on the top right-hand corner of the screen.

Once you arrive in the Amazon Athena console, you should already see the tables and databases you created previously, and you should be able to see the data in the two tables by writing Amazon Athena queries. Here is a list of the top 10 rows from the table readmissions.nonphi:

Now that we are able to query the dataset, we can run some queries for exploratory analysis. Here are just a few examples:

AnalysisAmazon Athena Query
How many Patients have been discharged to home?SELECT count(*) from nonphi where discharge_disposition = ‘Discharged to home’
What’s the minimum and the maximum number of procedures carried out on a patient?SELECT min(num_procedures), max(num_procedures) from nonphi
How many patients were referred to this hospital by another physician?SELECT count(*) FROM nonphi group by admission_source having admission_source = ‘Physician Referral’
What were the top 5 specialties with positive readmissions?

SELECT count(readmission_result) as num_readmissions, medical_specialty from

(select readmission_result,medical_specialty from nonphi where readmission_result = ‘Yes’)

group by medical_specialty order by num_readmissions desc limit 5

Which payer was responsible for paying for treatments that involved more than 5 procedures?SELECT distinct payer_code from nonphi where num_procedures >5 and payer_code !='(null)’

While this information is valuable, you typically do not want to invest too much time and effort into building an ad hoc query platform like this because at this stage, you are not even sure if the data is of any value for your business-critical analytical applications. One benefit of using Amazon Athena for ad hoc analysis is that it requires little effort or time. It uses Schema-On-Read instead of schema on write, allowing you to work with various source data formats without worrying about the underlying structures. You can put the data on Amazon S3 and start querying immediately.

  1. Create an external table in Amazon Redshift Spectrum with the same data

Now that we are satisfied with the data quality and understand the structure of the data, we would like to integrate this with a data warehouse. We’ll use Amazon Redshift Spectrum to create external tables on the files in S3 and then integrate these external tables with a physical table in Amazon Redshift.

Amazon Redshift Spectrum allows you to run Amazon Redshift SQL queries against data on Amazon S3, extending the capabilities of your data warehouse beyond the physical Amazon Redshift clusters. You don’t need to do any elaborate ETL or move the data around. The data exists in one place in Amazon S3 and you interface with it using different services (Athena and Redshift Spectrum) to satisfy different requirements.

Before beginning, please look at this step by step guide to set up Redshift Spectrum.

After you have set up Amazon Redshift Spectrum, you can begin executing the steps below:

  1. Create an external schema called “readmissions.” Amazon Redshift Spectrum integrates with the Amazon Glue Data Catalog and allows you to create spectrum tables by referring the catalog. This feature allows you to build the external table on the same data that you analyzed with Amazon Athena in the previous step without the need for ETL. This can be achieved by the following:
create external schema readmissions
from data catalog
database 'readmissions'
iam_role 'arn for your redshift spectrum role '
region ‘region when the S3 data exists’;

NOTE: Make sure you select the appropriate role arn and region.

  1. Once the command executes successfully, you can confirm the schema was created by running the following:
select * from svv_external_schemas;

You should see a row similar to the one above with your corresponding region and role.

You can also see the external tables that were created by running the following command:

select * from SVV_EXTERNAL_TABLES;

  1. Let’s confirm we can see all the rows in the external table by counting the number of rows:
select count(*) from readmissions.phi;
select count(*) from readmissions.nonphi;

You should see 101,766 rows in both the tables, confirming that your external tables contain all the records that you read using the AWS Glue data crawler and analyzed using Athena.

  1. Now that we have all the external tables created, let’s create an aggregate fact table in the physical Redshift data warehouse. We can use the “As Select” clause of the Redshift create table query to do this:
create table readmissions_aggregate_fact as
avg(time_in_hospital) as avg_time_in_hospital,
min(num_procedures) as min_procedures,
max(num_procedures) as max_procedures,
avg(num_procedures) as avg_num_procedures,
avg(num_medications) as avg_num_medications,
avg(number_outpatient) as avg_number_outpatient,
avg(number_emergency) as avg_number_emergency,
avg(number_inpatient) as avg_number_inpatient,
avg(number_diagnoses) as avg_number_diagnoses
from readmissions.nonphi
group by readmission_result,admission_type,discharge_disposition,diabetesmed

Once this query executes successfully, you can see a new table created in the physical public schema of your Amazon Redshift cluster. You can confirm this by executing the following query:

select distinct(tablename) from pg_table_def where schemaname = 'public'

  1. Build a QuickSight Dashboard from the aggregate fact

We can now create dashboards to visualize the data in our readmissions aggregate fact table using Amazon QuickSight. Here are some examples of reports you can generate using Amazon QuickSight on the readmission data.

For more details on Amazon QuickSight, refer to the service documentation.

  1. Build a ML model in Amazon SageMaker to predict readmissions

As a final step, we will create a ML model to predict the attribute readmission_result, which denotes if a patient was readmitted or not, using the non-PHI dataset.

  1. Create a notebook instance in Amazon SageMaker that is used to develop our code.
  2. The code reads non-PHI data from the Amazon S3 bucket as a data frame in Python. This is achieved using the pandas.readcsv function.

  1. Use the pandas.get_dummies function to encode categorical values into numeric values for use with the model.

  1. Split the data into two, 80% for training and 20% for testing, using the numpy.random.rand function.

  1. Form train_X, train_y and test_X, test_y corresponding to training features, training labels, testing features, and testing labels respectively.

  1. Use the Amazon SageMaker Linear learner algorithm to train our model. The implementation of the algorithm uses dense tensor format to optimize the training job. Use the function write_numpy_to_dense_tensor from the Amazon SageMaker library to convert the numpy array into the dense tensor format.

  1. Create the training job in Amazon SageMaker with appropriate configurations and run it.

  1. Once the training job completes, create an endpoint in Amazon SageMaker to host our model, using the linear.deploy function to deploy the endpoint.

  1. Finally, run a prediction by invoking the endpoint using the linear_predictor.predict function.

You can view the complete notebook here.

Data, analytics, and ML are strategic assets to help you manage your patients, staff, equipment, and supplies more efficiently. These technologies can also help you be more proactive in treating and preventing disease. Industry luminaries share this opinion: “By leveraging big data and scientific advancements while maintaining the important doctor-patient bond, we believe we can create a health system that will go beyond curing disease after the fact to preventing disease before it strikes by focusing on health and wellness,” writes Lloyd B. Minor, MD, dean of the Stanford School of Medicine.

ML and analytics offer huge value in helping achieve the quadruple aim : improved patient satisfaction, improved population health, improved provider satisfaction, and reduced costs. Technology should never replace the clinician but instead become an extension of the clinician and allow them to be more efficient by removing some of the mundane, repetitive tasks involved in prevention, diagnostics, and treatment of patients.

(1) “The Digital Future of Pathology.” The Medical Futurist, 28 May 2018, medicalfuturist.com/digital-future-pathology.

(2) Wang, Dayong, et al. “Deep Learning for Identifying Metastatic Breast Cancer.” Deep Learning for Identifying Metastatic Breast Cancer, 18 June 2016, arxiv.org/abs/1606.05718.

About the Author

Stephen Jepsen is a Global HCLS Practice Manager in AWS Professional Services.


Amazon Kinesis Data Streams Adds Enhanced Fan-Out and HTTP/2 for Faster Streaming

Post Syndicated from Randall Hunt original https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/

A few weeks ago, we launched two significant performance improving features for Amazon Kinesis Data Streams (KDS): enhanced fan-out and an HTTP/2 data retrieval API. Enhanced fan-out allows developers to scale up the number of stream consumers (applications reading data from a stream in real-time) by offering each stream consumer its own read throughput. Meanwhile, the HTTP/2 data retrieval API allows data to be delivered from producers to consumers in 70 milliseconds or better (a 65% improvement) in typical scenarios. These new features enable developers to build faster, more reactive, highly parallel, and latency-sensitive applications on top of Kinesis Data Streams.

Kinesis actually refers to a family of streaming services: Kinesis Video Streams, Kinesis Data Firehose, Kinesis Data Analytics, and the topic of today’s blog post, Kinesis Data Streams (KDS). Kinesis Data Streams allows developers to easily and continuously collect, process, and analyze streaming data in real-time with a fully-managed and massively scalable service. KDS can capture gigabytes of data per second from hundreds of thousands of sources – everything from website clickstreams and social media feeds to financial transactions and location-tracking events.

Kinesis Data Streams are scaled using the concept of a shard. One shard provides an ingest capacity of 1MB/second or 1000 records/second and an output capacity of 2MB/second. It’s not uncommon for customers to have thousands or tens of thousands of shards supporting 10s of GB/sec of ingest and egress. Before the enhanced fan-out capability, that 2MB/second/shard output was shared between all of the applications consuming data from the stream. With enhanced fan-out developers can register stream consumers to use enhanced fan-out and receive their own 2MB/second pipe of read throughput per shard, and this throughput automatically scales with the number of shards in a stream. Prior to the launch of Enhanced Fan-out customers would frequently fan-out their data out to multiple streams to support their desired read throughput for their downstream applications. That sounds like undifferentiated heavy lifting to us, and that’s something we decided our customers shouldn’t need to worry about. Customers pay for enhanced fan-out based on the amount of data retrieved from the stream using enhanced fan-out and the number of consumers registered per-shard. You can find additional info on the pricing page.

Before we jump into a description of the new API, let’s cover a few quick notes about HTTP/2 and how we use that with the new SubscribeToShard API.


HTTP/2 is a major revision to the HTTP network protocol that introduces a new method for framing and transporting data between clients and servers. It’s a binary protocol. It enables many new features focused on decreasing latency and increasing throughput. The first gain is the use of HPACK to compress headers. Another useful feature is connection multiplexing which allows us to use a single TCP connection for multiple parallel non-blocking requests. Additionally, instead of the traditional request-response semantics of HTTP, the communication pipe is bidirectional. A server using HTTP/2 can push multiple responses to a client without waiting for the client to request those resources. Kinesis’s SubscribeToShard API takes advantage of this server push feature to receive new records and makes use of another HTTP/2 feature called flow control. Kinesis pushes data to the consumer and keeps track of the number of bytes that have been unacknowledged. The client acknowledges bytes received by sending WINDOW_UPDATE frames to the server. If the client can’t handle the rate of data, then Kinesis will pause the flow of data until a new WINDOW_UPDATE frame is received or until the 5 minute subscription expires.

Now that we have a grasp on SubscribeToShard and HTTP/2 let’s cover how we use this to take advantage of enhanced fan-out!

Using Enhanced Fan-out

The easiest way to make use of enhanced fan-out is to use the updated Kinesis Client Library 2.0 (KCL). KCL will automatically register itself as a consumer of the stream. Then KCL will enumerate the shards and subscribe to them using the new SubscribeToShard API. It will also continuously call SubscribeToShard whenever the underlying connections are terminated. Under the hood, KCL handles checkpointing and state management of a distributed app with a Amazon DynamoDB table it creates in your AWS account. You can see an example of this in the documentation.

The general process for using enhanced fan-out is:

  1. Call RegisterStreamConsumer and provide the StreamARN and ConsumerName (commonly the application name). Save the ConsumerARN returned by this API call. As soon as the consumer is registered, enhanced fan-out is enabled and billing for consumer-shard-hours begins.
  2. Enumerate stream shards and call SubscribeToShard on each of them with the ConsumerARN returned by RegisterStreamConsumer. This establishes an HTTP/2 connection, and KDS will push SubscribeToShardEvents to the listening client. These connections are terminated by KDS every 5 minutes, so the client will need to call SubscribeToShard again if you want to continue receiving events. Bytes pushed to the client using enhanced fan-out are billed under enhanced fan-out data retrieval rates.
  3. Finally, remember to call DeregisterStreamConsumer when you’re no longer using the consumer since it does have an associated cost.

You can see some example code walking through this process in the documentation.

You can view Amazon CloudWatch metrics and manage consumer applications in the console, including deregistering them.

Available Now

Enhanced fan-out and the new HTTP/2 SubscribeToShard API are both available now in all regions for new streams and existing streams. There’s a lot more information than what I’ve covered in this blog post in the documentation. There is a per-stream limit of 5 consumer applications (e.g., 5 different KCL applications) reading from all shards but this can be increased with a  support ticket. I’m excited to see customers take advantage of these new features to reduce the complexity of managing multiple stream consumers and to increase the speed and parallelism of their real-time applications.

As always feel free to leave comments below or on Twitter.


Viewing Amazon Elasticsearch Service Error Logs

Post Syndicated from Kevin Fallis original https://aws.amazon.com/blogs/big-data/viewing-amazon-elasticsearch-service-error-logs/

Today, Amazon Elasticsearch Service (Amazon ES) announces support for publishing error logs to Amazon CloudWatch Logs.  This new feature provides you with the ability to capture error logs so you can access information about errors and warnings raised during the operation of the service. These details can be useful for troubleshooting. You can then use this information to work with your users to identify patterns that cause error or warning scenarios on your domain.

Access to the feature is enabled as soon as your domain is created.

You can turn the logs on and off at will, paying only for the CloudWatch charges based on their usage.

Set up delivery of error logs for your domain

To enable error logs for an active domain, sign in to the AWS Management Console and choose Elasticsearch Service. On the Amazon ES console, choose your domain name in the list to open its dashboard. Then choose the Logs tab.

In this pane, you configure your Amazon ES domain to publish search slow logs, indexing slow logs, and error logs to a CloudWatch Logs log group. You can find more information on setting up slow logs in the blog post Viewing Amazon Elasticsearch Service Slow Logs on the AWS Database Blog.

Under Set up Error Logs, choose Setup.

You can choose to Create new log group or Use existing log group. We recommend naming your log group as a path, such as:


This naming scheme makes it easier to apply a CloudWatch access policy, in which you can grant permissions to all log groups under a specific path, such as:


To deliver logs to your CloudWatch Logs group, you need to specify a policy for Amazon ES so it can publish to CloudWatch Logs on your behalf.  You can choose to Create a new policy or Select an existing policy. You can accept the policy as is. Or, if your log group names are paths, you can widen the Resource—for example:


You can then reuse this policy for all your domains.

Once you have saved the policy for the domain, Choose Enable, and you have completed setup. Your domain can now send error logs to CloudWatch Logs.

Now that you have enabled the publishing of error logs, you can start monitoring them.

Types of events captured

Elasticsearch uses Apache Log4j 2 and its built-in log levels (from least to most severe) of TRACE, DEBUG, INFO, WARN, ERROR, and FATAL. After you enable error logs, Amazon ES publishes log lines of WARN, ERROR, and FATAL to CloudWatch. Less severe levels (INFO, DEBUG and TRACE) are not available.

Based on this, you can expect to find details for events such as the ones highlighted in the following list.

  • Rejects based on exceeding the configured highlight.max_analyzed_offset parameter limit
  • Painless script compilation issues in a request
  • Detailed information about invalid requests and invalid query formats
  • GC cycles
  • Detailed information about write blocks
  • Issues encountered during snapshot exercises

View your log data

To see your log data, sign in to the AWS Management Console, and open the CloudWatch console. In the left navigation pane, choose the Logs tab. Find your log group in the list of groups and open the log group. Your log group name is the Name that you set when you set up logging in the Amazon ES wizard.

Within your log group, you should see a number of log streams.

Amazon ES creates es-test-log-stream during setup of error logs to ensure that it can write to CloudWatch Logs. This stream contains only a single test message.

Your application error logs arrive within 30 minutes and have long hex names, suffixed by es-application-logs to indicate the source of the log data. Choose one of these to view events based on the last event time.

You should see individual entries for each event in timestamp order. To switch from less granular detail to highly granular detail on the event log entry, you can use a toggle at the top right of the CloudWatch Logs console. The format is a timestamp, the locus, the node generating the error or warning, and text cleansed of any specifics on the cluster itself such as you see in this stack trace.


By enabling the error logs feature, you can gain more insight into issues with your Amazon ES domains and identify issues with domain configurations.  Additionally, you can also use the integration of CloudWatch Logs and Amazon ES to send application error logs to a different Amazon ES domain and monitor your domain’s performance.

About the Author

Kevin Fallis is an AWS solutions architect specializing in search technologies.





Amazon Kinesis Video Streams Adds Support For HLS Output Streams

Post Syndicated from Randall Hunt original https://aws.amazon.com/blogs/aws/amazon-kinesis-video-streams-adds-support-for-hls-output-streams/

Today I’m excited to announce and demonstrate the new HTTP Live Streams (HLS) output feature for Amazon Kinesis Video Streams (KVS). If you’re not already familiar with KVS, Jeff covered the release for AWS re:Invent in 2017. In short, Amazon Kinesis Video Streams is a service for securely capturing, processing, and storing video for analytics and machine learning – from one device or millions. Customers are using Kinesis Video with machine learning algorithms to power everything from home automation and smart cities to industrial automation and security.

After iterating on customer feedback, we’ve launched a number of features in the past few months including a plugin for GStreamer, the popular open source multimedia framework, and docker containers which make it easy to start streaming video to Kinesis. We could talk about each of those features at length, but today is all about the new HLS output feature! Fair warning, there are a few pictures of my incredibly messy office in this post.

HLS output is a convenient new feature that allows customers to create HLS endpoints for their Kinesis Video Streams, convenient for building custom UIs and tools that can playback live and on-demand video. The HLS-based playback capability is fully managed, so you don’t have to build any infrastructure to transmux the incoming media. You simply create a new streaming session, up to 5 (for now), with the new GetHLSStreamingSessionURL API and you’re off to the races. The great thing about HLS is that it’s already an industry standard and really easy to leverage in existing web-players like JW Player, hls.js, VideoJS, Google’s Shaka Player, or even rendering natively in mobile apps with Android’s Exoplayer and iOS’s AV Foundation. Let’s take a quick look at the API, feel free to skip to the walk-through below as well.

Kinesis Video HLS Output API

The documentation covers this in more detail than what we can go over in the Blog but I’ll cover the broad components.

  1. Get an endpoint with the GetDataEndpoint API
  2. Use that endpoint to get an HLS streaming URL with the GetHLSStreamingSessionURL API
  3. Render the content in the HLS URL with whatever tools you want!

This is pretty easy in a Jupyter notebook with a quick bit of Python and boto3.

import boto3
STREAM_NAME = "RandallDeepLens"
kvs = boto3.client("kinesisvideo")
# Grab the endpoint from GetDataEndpoint
endpoint = kvs.get_data_endpoint(
# Grab the HLS Stream URL from the endpoint
kvam = boto3.client("kinesis-video-archived-media", endpoint_url=endpoint)
url = kvam.get_hls_streaming_session_url(

You can even visualize everything right away in Safari which can render HLS streams natively.

from IPython.display import HTML
HTML(data='<video src="{0}" autoplay="autoplay" controls="controls" width="300" height="400"></video>'.format(url)) 

We can also stream directly from a AWS DeepLens with just a bit of code:

import DeepLens_Kinesis_Video as dkv
import time
aws_access_key = "super_fake"
aws_secret_key = "even_more_fake"
region = "us-east-1"
stream_name ="RandallDeepLens"
retention = 1 #in minutes.
wait_time_sec = 60*300 #The number of seconds to stream the data
# will create the stream if it does not already exist
producer = dkv.createProducer(aws_access_key, aws_secret_key, "", region)
my_stream = producer.createStream(stream_name, retention)

How to use Kinesis Video Streams HLS Output Streams

We definitely need a Kinesis Video Stream, which we can create easily in the Kinesis Video Streams Console.

Now, we need to get some content into the stream. We have a few options here. Perhaps the easiest is the docker container. I decided to take the more adventurous route and compile the GStreamer plugin locally on my mac, following the scripts on github. Be warned, compiling this plugin takes a while and can cause your computer to transform into a space heater.

With our freshly compiled GStreamer binaries like gst-launch-1.0 and the kvssink plugin we can stream directly from my macbook’s webcam, or any other GStreamer source, into Kinesis Video Streams. I just use the kvssink output plugin and my data will wind up in the video stream. There are a few parameters to configure around this, so pay attention.

Here’s an example command that I ran to stream my macbook’s webcam to Kinesis Video Streams:

gst-launch-1.0 autovideosrc ! videoconvert \
! video/x-raw,format=I420,width=640,height=480,framerate=30/1 \
! vtenc_h264_hw allow-frame-reordering=FALSE realtime=TRUE max-keyframe-interval=45 bitrate=500 \
! h264parse \
! video/x-h264,stream-format=avc,alignment=au,width=640,height=480,framerate=30/1 \
! kvssink stream-name="BlogStream" storage-size=1024 aws-region=us-west-2 log-config=kvslog

Now that we’re streaming some data into Kinesis, I can use the getting started sample static website to test my HLS stream with a few different video players. I just fill in my AWS credentials and ask it to start playing. The GetHLSStreamingSessionURL API supports a number of parameters so you can play both on-demand segments and live streams from various timestamps.

Additional Info

Data Consumed from Kinesis Video Streams using HLS is charged $0.0119 per GB in US East (N. Virginia) and US West (Oregon) and pricing for other regions is available on the service pricing page. This feature is available now, in all regions where Kinesis Video Streams is available.

The Kinesis Video team told me they’re working hard on getting more integration with the AWS Media services, like MediaLive, which will make it easier to serve Kinesis Video Stream content to larger audiences.

As always, let us know what you you think on twitter or in the comments. I’ve had a ton of fun playing around with this feature over the past few days and I’m excited to see customers build some new tools with it!