Tag Archives: Amazon Kinesis

How Wind Mobility built a serverless data architecture

Post Syndicated from Pablo Giner original https://aws.amazon.com/blogs/big-data/how-wind-mobility-built-a-serverless-data-architecture/

Guest post by Pablo Giner, Head of BI, Wind Mobility.

Over the past few years, urban micro-mobility has become a trending topic. With the contamination indexes hitting historic highs, cities and companies worldwide have been introducing regulations and working on a wide spectrum of solutions to alleviate the situation.

We at Wind Mobility strive to make commuters’ life more sustainable and convenient by bringing short distance urban transportation to cities worldwide.

At Wind Mobility, we scale our services at the same pace as our users demand them, and we do it in an economically and environmentally viable way. We optimize our fleet distribution to avoid overcrowding cities with more scooters than those that are actually going to be used, and we position them just meters away from where our users need them and at the time of the day when they want them.

How do we do that? By optimizing our operations to their fullest. To do so, we need to be very well informed about our users’ behavior under varying conditions and understand our fleet’s potential.

Scalability and flexibility for rapid growth

We knew that before we could solve this challenge, we needed to collect data from many different sources, such as user interactions with our application, user demand, IoT signals from our scooters, and operational metrics. To analyze the numerous datasets collected and extract actionable insights, we needed to build a data lake. While the high-level goal was clear, the scope was less so. We were working hard to scale our operation as we continued to launch new markets. The rapid growth and expansion made it very difficult to predict the volume of data we would need to consume. We were also launching new microservices to support our growth, which resulted in more data sources to ingest. We needed an architecture that allowed us to be agile and quickly adopt to meet our growth. It became clear that a serverless architecture was best positioned to meet those needs, so we started to design our 100% serverless infrastructure.

The first challenge was ingesting and storing data from our scooters in the field, events from our mobile app, operational metrics, and partner APIs. We use AWS Lambda to capture changes in our operational databases and mobile app and push the events to Amazon Kinesis Data Streams, which allows us to take action in real time. We also use Amazon Kinesis Data Firehose to write the data to Amazon Simple Storage Service (Amazon S3), which we use for analytics.

After we were in Amazon S3 and adequately partitioned as per its most common use cases (we partition by date, region, and business line, depending on the data source), we had to find a way to query this data for both data profiling (understanding structure, content, and interrelationships) and ad hoc analysis. For that we chose AWS Glue crawlers to catalog our data and Amazon Athena to read from the AWS Glue Data Catalog and run queries. However, ad hoc analysis and data profiling are relatively sporadic tasks in our team, because most of the data processing computing hours are actually dedicated to transforming the multiple data sources into our data warehouse, consolidating the raw data, modeling it, adding new attributes, and picking the data elements, which constitute 95% of our analytics and predictive needs.

This is where all the heavy lifting takes place. We parse through millions of scooter and user events generated daily (over 300 events per second) to extract actionable insight. We selected AWS Glue to perform this task. Our primary ETL job reads the newly added raw event data from Amazon S3, processes it using Apache Spark, and writes the results to our Amazon Redshift data warehouse. AWS Glue plays a critical role in our ability to scale on demand. After careful evaluation and testing, we concluded that AWS Glue ETL jobs meet all our needs and free us from procuring and managing infrastructure.

Architecture overview

The following diagram represents our current data architecture, showing two serverless data collection, processing, and reporting pipelines:

  • Operational databases from Amazon Relational Database Service (Amazon RDS) and MongoDB
  • IoT and application events, followed by Athena for data profiling and Amazon Redshift for reporting

Our data is curated and transformed multiple times a day using an automated pipeline running on AWS Glue. The team can now focus on analyzing the data and building machine learning (ML) applications.

We chose Amazon QuickSight as our business intelligence tool to help us visualize and better understand our operational KPIs. Additionally, we use Amazon Elastic Container Registry (Amazon ECR) to store our Docker images containing our custom ML algorithms and Amazon Elastic Container Service (Amazon ECS) where we train, evaluate, and host our ML models. We schedule our models to be trained and evaluated multiple times a day. Taking as input curated data about demand, conversion, and flow of scooters, we run the models to help us optimize fleet utilization for a particular city at any given time.

The following diagram represents how data from the data lake is incorporated into our ML training, testing, and serving system. First, our developers work in the application code and commit their changes, which are built into new Docker images by our CI/CD pipeline and stored in the Amazon ECR registry. These images are pushed into Amazon ECS and tested in DEV and UAT environments before moving to PROD (where they are triggered by the Amazon ECS task scheduler). During their execution, the Amazon ECS tasks (some train the demand and usage forecasting models, some produce the daily and hourly predictions, and others optimize the fleet distribution to satisfy the forecast) read their configuration and pull data from Amazon S3 (which has been previously produced by scheduled AWS Glue jobs), finally storing their results back into Amazon S3. Executions of these pipelines are tracked via MLFlow (in a dedicated Amazon Elastic Compute Cloud (Amazon EC2) server) and the final result indicating the fleet operations required is fit into a Kepler map, which is then consumed by the operators on the field.

Conclusion

We at Wind Mobility place data at the forefront of our operations. For that, we need our data infrastructure to be as flexible as the industry and the context we operate in, which is why we chose serverless. Over the course of a year, we have built a data lake, a data warehouse, a BI suite, and a variety of (production) data science applications. All of that with a very small team.

Also, within the last 12 months, we have scaled up several of our data pipelines by a factor of 10, without slowing our momentum or redesigning any part of our architecture. When it came to double our fleet in 1 week and increase the frequency at which we capture data from scooters by a factor of 10, our serverless data architecture scaled with no issues. This allowed us to focus on adding value by simplifying our operation, reacting to changes quickly, and delighting our users.

We have measured our success in multiple dimensions:

  • Speed – Serverless is faster to deploy and expand; we believe we have reduced our time to market for the entire infrastructure by a factor of 2
  • Visibility – We have 360 degree visibility of our operations worldwide, accessible by our city managers, finance team, and management board
  • Optimized fleet deployment – We know, at any minute of the day, the number of scooters that our customers need over the next few hours, which reduces unsatisfied demand by more than 50%

If you face a similar challenge, our advice is clear: go fully serverless and use the spectrum of solutions available from AWS.

Follow us and discover more about Wind Mobility on Facebook, Instagram and LinkedIn.

 


About the Author

Pablo Giner is Head of BI at Wind Mobility. Pablo’s background is in wheels (motorcycle racing > vehicle engineering > collision insurance > eScooters sharing…) and for the last few years he has specialized in forming and developing data teams. At Wind Mobility, he leads the data function (data engineering + analytics + data science), and the project he is most proud of is what they call smart fleet rebalancing, an AI backed solution to reposition their fleet in real-time. “In God we trust. All others must bring data.” – W. Edward Deming

 

 

 

Build an AWS Well-Architected environment with the Analytics Lens

Post Syndicated from Nikki Rouda original https://aws.amazon.com/blogs/big-data/build-an-aws-well-architected-environment-with-the-analytics-lens/

Building a modern data platform on AWS enables you to collect data of all types, store it in a central, secure repository, and analyze it with purpose-built tools. Yet you may be unsure of how to get started and the impact of certain design decisions. To address the need to provide advice tailored to specific technology and application domains, AWS added the concept of well-architected lenses 2017. AWS now is happy to announce the Analytics Lens for the AWS Well-Architected Framework. This post provides an introduction of its purpose, topics covered, common scenarios, and services included.

The new Analytics Lens offers comprehensive guidance to make sure that your analytics applications are designed in accordance with AWS best practices. The goal is to give you a consistent way to design and evaluate cloud architectures, based on the following five pillars:

  • Operational excellence
  • Security
  • Reliability
  • Performance efficiency
  • Cost optimization

The tool can help you assess the analytics workloads you have deployed in AWS by identifying potential risks and offering suggestions for improvements.

Using the Analytics Lens to address common requirements

The Analytics Lens models both the data architecture at the core of the analytics applications and the application behavior itself. These models are organized into the following six areas, which encompass the vast majority of analytics workloads deployed on AWS:

  1. Data ingestion
  2. Security and governance
  3. Catalog and search
  4. Central storage
  5. Processing and analytics
  6. User access

The following diagram illustrates these areas and their related AWS services.

There are a number of common scenarios where the Analytics Lens applies, such as the following:

  • Building a data lake as the foundation for your data and analytics initiatives
  • Efficient batch data processing at scale
  • Building a platform for streaming ingest and real-time event processing
  • Handling big data processing and streaming
  • Data-preparation operations

Whichever of these scenarios fits your needs, building to the principles of the Analytics Lens in the AWS Well-Architected Framework can help you implement best practices for success.

The Analytics Lens explains when and how to use the core services in the AWS analytics portfolio. These include Amazon Kinesis, Amazon Redshift, Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. It also explains how Amazon Simple Storage Service (Amazon S3) can serve as the storage for your data lake and how to integrate with relevant AWS security services. With reference architectures, best practices advice, and answers to common questions, the Analytics Lens can help you make the right design decisions.

Conclusion

Applying the lens to your existing architectures can validate the stability and efficiency of your design (or provide recommendations to address the gaps that are identified). AWS is committed to the Analytics Lens as a living tool; as the analytics landscape evolves and new AWS services come on line, we’ll update the Analytics Lens appropriately. Our mission will always be to help you design and deploy well-architected applications.

For more information about building your own Well-Architected environment using the Analytics Lens, see the Analytics Lens whitepaper.

Special thanks to the following individuals who contributed to building this resource, among many others who helped with review and implementation: Radhika Ravirala, Laith Al-Saadoon, Wallace Printz, Ujjwal Ratan, and Neil Mukerje.

Are there questions you’d like to see answered in the tool? Share your thoughts and questions in the comments.

 


About the Authors

Nikki Rouda is the principal product marketing manager for data lakes and big data at Amazon Web Services. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 


Radhika Ravirala is a specialist solutions architect at Amazon Web Services, where she helps customers craft distributed analytics applications on the AWS platform. Prior to her cloud journey, she worked as a software engineer and designer for technology companies in Silicon Valley.

New – Serverless Streaming ETL with AWS Glue

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-serverless-streaming-etl-with-aws-glue/

When you have applications in production, you want to understand what is happening, and how the applications are being used. To analyze data, a first approach is a batch processing model: a set of data is collected over a period of time, then run through analytics tools. To be able to react quickly, you can use a streaming model, where data is processed as it arrives, a record at a time or in micro-batches of tens, hundreds, or thousands of records.

Managing continuous ingestion pipelines and processing data on-the-fly is quite complex, because it’s an always-on system that needs to be managed, patched, scaled, and generally taken care of. Today, we are making this easier and more cost-effective to implement by extending AWS Glue jobs, based on Apache Spark, to run continuously and consume data from streaming platforms such as Amazon Kinesis Data Streams and Apache Kafka (including the fully-managed Amazon MSK).

In this way, Glue can provision, manage, and scale the infrastructure needed to ingest data to data lakes on Amazon S3, data warehouses such as Amazon Redshift, or other data stores. For example, you can store streaming data in a DynamoDB table for quick lookups, or in Elasticsearch to look for specific patterns. This procedure is usually referred to as extract, transform, load (ETL).

As you process streaming data in a Glue job, you have access to the full capabilities of Spark Structured Streaming to implement data transformations, such as aggregating, partitioning, and formatting as well as joining with other data sets to enrich or cleanse the data for easier analysis. For example, you can access an external system to identify fraud in real-time, or use machine learning algorithms to classify data, or detect anomalies and outliers.

Processing Streaming Data with AWS Glue
To try this new feature, I want to collect data from IoT sensors and store all data points in an S3 data lake. I am using a Raspberry Pi with a Sense HAT to collect temperature, humidity, barometric pressure, and its position in space in real-time (using the integrated gyroscope, accelerometer, and magnetometer). Here’s an architectural view of what I am building:

First, I register the device with AWS IoT Core, and run the following Python code to send, once per second, a JSON message with sensor data to the streaming-data MQTT topic. I have a single device in this setup, with more devices, I would use a subtopic per device, for example streaming-data/{client_id}.

import time
import datetime
import json
from sense_hat import SenseHat
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder

sense = SenseHat()

topic = "streaming-data"
client_id = "raspberrypi"

# Callback when connection is accidentally lost.


def on_connection_interrupted(connection, error, **kwargs):
    print("Connection interrupted. error: {}".format(error))


# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print("Connection resumed. return_code: {} session_present: {}".format(
        return_code, session_present))

    if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
        print("Session did not persist. Resubscribing to existing topics...")
        resubscribe_future, _ = connection.resubscribe_existing_topics()

        # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
        # evaluate result with a callback instead.
        resubscribe_future.add_done_callback(on_resubscribe_complete)


def on_resubscribe_complete(resubscribe_future):
    resubscribe_results = resubscribe_future.result()
    print("Resubscribe results: {}".format(resubscribe_results))

    for topic, qos in resubscribe_results['topics']:
        if qos is None:
            sys.exit("Server rejected resubscribe to topic: {}".format(topic))


# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, **kwargs):
    print("Received message from topic '{}': {}".format(topic, payload))


def collect_and_send_data():
    publish_count = 0
    while(True):

        humidity = sense.get_humidity()
        print("Humidity: %s %%rH" % humidity)

        temp = sense.get_temperature()
        print("Temperature: %s C" % temp)

        pressure = sense.get_pressure()
        print("Pressure: %s Millibars" % pressure)

        orientation = sense.get_orientation_degrees()
        print("p: {pitch}, r: {roll}, y: {yaw}".format(**orientation))

        timestamp = datetime.datetime.fromtimestamp(
            time.time()).strftime('%Y-%m-%d %H:%M:%S')

        message = {
            "client_id": client_id,
            "timestamp": timestamp,
            "humidity": humidity,
            "temperature": temp,
            "pressure": pressure,
            "pitch": orientation['pitch'],
            "roll": orientation['roll'],
            "yaw": orientation['yaw'],
            "count": publish_count
        }
        print("Publishing message to topic '{}': {}".format(topic, message))

        mqtt_connection.publish(
            topic=topic,
            payload=json.dumps(message),
            qos=mqtt.QoS.AT_LEAST_ONCE)
        time.sleep(1)
        publish_count += 1


if __name__ == '__main__':
    # Spin up resources
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint="a1b2c3d4e5f6g7-ats.iot.us-east-1.amazonaws.com",
        cert_filepath="rapberrypi.cert.pem",
        pri_key_filepath="rapberrypi.private.key",
        client_bootstrap=client_bootstrap,
        ca_filepath="root-CA.crt",
        on_connection_interrupted=on_connection_interrupted,
        on_connection_resumed=on_connection_resumed,
        client_id=client_id,
        clean_session=False,
        keep_alive_secs=6)

    connect_future = mqtt_connection.connect()

    # Future.result() waits until a result is available
    connect_future.result()
    print("Connected!")

    # Subscribe
    print("Subscribing to topic '{}'...".format(topic))
    subscribe_future, packet_id = mqtt_connection.subscribe(
        topic=topic,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_message_received)

    subscribe_result = subscribe_future.result()
    print("Subscribed with {}".format(str(subscribe_result['qos'])))

    collect_and_send_data()

This is an example of the JSON messages sent by the device:

{
    "client_id": "raspberrypi",
    "timestamp": "2020-04-16 11:33:23",
    "humidity": 39.35261535644531,
    "temperature": 30.10732078552246,
    "pressure": 1020.447509765625,
    "pitch": 4.044007304723748,
    "roll": 7.533848064912158,
    "yaw": 77.01560798660883,
    "count": 104
}

In the Kinesis console, I create the my-data-stream data stream (1 shard is more than enough for my workload). Back in the AWS IoT console, I create an IoT rule to send all data from the MQTT topic to this Kinesis data stream.

Now that all sensor data is sent to Kinesis, I can leverage the new Glue integration to process data as it arrives. In the Glue console, I manually add a table in the Glue Data Catalog. I select Kinesis as the type of source, and enter my stream name and the endpoint of the Kinesis Data Streams service. Note that for Kafka streams, before creating the table, you need to create a Glue connection.

I select JSON as data format, and define the schema for the streaming data. If I don’t specify a column here, it will be ignored when processing the stream.

After that, I confirm the final recap step, and create the my_streaming_data table. We are working to add schema inference to streaming ETL jobs. With that, specifying the full schema up front won’t be necessary. Stay tuned.

To process the streaming data, I create a Glue job. For the IAM role, I create a new one attaching the AWSGlueServiceRole and AmazonKinesisReadOnlyAccess managed policies. Depending on your use case and the set up of your AWS accounts, you may want to use a role providing more fine-grained access.

For the data source, I select the table I just created, receiving data from the Kinesis stream.

To get a script generated by Glue, I select the Change schema transform type. As target, I create a new table in the Glue Data Catalog, using an efficient format like Apache Parquet. The Parquet files generated by this job are going to be stored in an S3 bucket whose name starts with aws-glue- (including the final hyphen). By following the naming convention for resources specified in the AWSGlueServiceRole policy, this job has the required permissions to access those resources.

I leave the default mapping that keeps in output all the columns in the source stream. In this way, I can ingest all the records using the proposed script, without having to write a single line of code.

I quickly review the proposed script and save. Each record is processed as a DynamicFrame, and I can apply any of the Glue PySpark Transforms or any transforms supported by Spark Structured Streaming. By default with this configuration, only ApplyMapping is used.

I start the job, and after a few minutes I see the Parquet files containing the output of the job appearing in the output S3 bucket. They are partitioned by ingest date (year, month, day, and hour).

To populate the Glue Data Catalog with tables based on the content of the S3 bucket, I add and run a crawler. In the crawler configuration, I exclude the checkpoint folder used by Glue to keep track of the data that has been processed. After less than a minute, a new table has been added.

In the Amazon Athena console, I refresh database and tables, and select to preview the output_my_data containing ingest data from this year. In this way, I see the first ten records in the table, and get a confirmation that my setup is working!

Now, as data is being ingested, I can run more complex queries. For example, I can get the minimum and maximum temperature, collected from the device sensors, and the overall number of records stored in the Parquet files.

Looking at the results, I see more than 8,000 records have been processed, with a maximum temperature of 31 degrees Celsius (about 88 degrees Fahrenheit). Actually, it was never really this hot. Temperature is measured by these sensors very close to the device, and is growing as the device is warming up with usage.

I am using a single device in this set up, but the solution implemented here can easily scale up with the number of data sources.

Available Now
Support for streaming sources is available in all regions where Glue is offered, as described in the AWS Region table. For more information, please have a look at the documentation.

Managing a serverless ETL pipeline with Glue makes it easier and more cost-effective to set up and manage streaming ingestion processes, reducing implementation efforts so you can focus on the business outcomes of analytics. You can set up a whole ingestion pipeline without writing code, as I did in this walkthrough, or customize the proposed script based on your needs.

Let me know what are you going to use this new feature for!

Danilo

Ingest streaming data into Amazon Elasticsearch Service within the privacy of your VPC with Amazon Kinesis Data Firehose

Post Syndicated from Tarik Makota original https://aws.amazon.com/blogs/big-data/ingest-streaming-data-into-amazon-elasticsearch-service-within-the-privacy-of-your-vpc-with-amazon-kinesis-data-firehose/

Today we are adding a new Amazon Kinesis Data Firehose feature to set up VPC delivery to your Amazon Elasticsearch Service domain from the Kinesis Data Firehose. If you have been managing a custom application on Amazon Kinesis Data Streams to keep traffic private, you can now use Kinesis Data Firehose and load your data into an Amazon Elasticsearch Service endpoint in a VPC without having to invest, operate, and scale ingestion and delivery infrastructure. You can start using this new feature from Kinesis Data Firehose console, AWS CLI, and API by selecting Amazon Elasticsearch Service as the destination, the specific domain with VPC access, and setting the VPC configuration with subnets and the optional security groups.

Before this feature

Amazon Elasticsearch Service domains can have public or private endpoints. Public endpoints are backed by IP addresses on the public internet. Private endpoints are backed by IP addresses within the IP space of your VPC.

If you have been using an Amazon Elasticsearch Service VPC endpoint, you most likely use Kinesis Data Streams or similar soultion to ingest streaming data. This means running a custom application on the stream that delivers it to the Amazon Elasticsearch Service VPC domain. You likely had to perform the following actions:

  • Implement buffering
  • Format conversions
  • Perform compression
  • Apply transformation
  • Manage backup
  • Handle transient delivery failures

Additionally, you have to build, scale, monitor, update, and maintain this custom application.

Kinesis Data Firehose delivery to Amazon Elasticsearch Service VPC endpoint

Kinesis Data Firehose can now deliver data into an Amazon Elasticsearch Service VPC endpoint. This provides a secure and easy way to ingest, transform, and deliver streaming data. You don’t need to worry about managing your data ingestion and delivery infrastructure. With this new feature, Kinesis Data Firehose enables additional secure communication to Amazon Elasticsearch Service VPC endpoints. Amazon Elasticsearch Service endpoints that live within a VPC give you an extra layer of security.

How it works

When you create a Kinesis Data Firehose delivery stream that delivers data to an Amazon Elasticsearch Service VPC endpoint, Kinesis Data Firehose creates an Elastic Network Interface (ENI) in each subnet you select. If you only use one Availability Zone, Kinesis Data Firehose places an endpoint into only one subnet. Similarly, when you create an Amazon Elasticsearch Service VPC endpoint, it creates endpoints in the subnets you chose. Kinesis Data Firehose uses ENI to deliver the data to your Amazon Elasticsearch Service ENI, all inside your VPC. The following screenshot outlines the resulting architecture with a single subnet.

For this walkthrough, you have two security groups:

  • kdf-sec-grp for your Kinesis Data Firehose endpoint
  • es-sec-grp for your Amazon Elasticsearch Service endpoint

To let Kinesis Data Firehose access your Amazon Elasticsearch Service VPC endpoint, security group es-sec-grp needs to allow the ENI that Kinesis Data Firehose created to make HTTPS calls. Kinesis Data Firehose scales the ENIs automatically to meet the throughput requirements. As Kinesis Data Firehose scales ENIs, the outbound rules of the enclosing security group kdf-sec-grp control the data stream. You should configure the Amazon Elasticsearch Service security group (es-sec-grp) to allow HTTPS traffic from the Kinesis Data Firehose security group (kdf-sec-grp). The Kinesis Data Firehose security group needs to allow outbound HTTPS traffic, and its destination is the Amazon Elasticsearch Service security group. With Kinesis Data Firehose VPC delivery, you do not need to make the Firehose security group open to outside traffic.

You can also use the same security group for Kinesis Data Firehose and Amazon Elasticsearch Service endpoints. If you use the same security group for both, make sure the security group inbound rule allows HTTPS traffic.

For your existing delivery streams, you can change the destination endpoint. The new destination must be accessible within the same VPC, subnets, and security groups. Changing either of the VPC, subnets, and security groups requires you to recreate a delivery stream.

All existing Kinesis Data Firehose limits apply to this capability. For example, you can increase the default 50 delivery streams per account by submitting a quota increase request. Also, Kinesis Data Firehose creates one or more ENIs per VPC destination subnet per delivery stream. Kinesis Data Firehose automatically scales the number of ENIs as needed based on the actual throughput. The default throughput limit per delivery stream is 5 MB/second (dependent on Region). You can request an increase to this limit by submitting a support case.

You need to make sure you have enough ENIs available. By default, VPC has a quota of 5000 ENIs per Region. For more information, see Amazon VPC Quotas.

The advantage of using a managed service like Kinesis Data Firehose is that you can focus on the value of your data and not the underlying plumbing. You can configure the frequency of data delivery from your delivery stream to your Amazon Elasticsearch Service domain. Kinesis Data Firehose buffers incoming data before delivering it to Amazon ES. You can configure the values for Amazon Elasticsearch Service buffer size (1 MB–100 MB) or buffer interval (60–900 seconds), and the condition satisfied first triggers data delivery to Amazon Elasticsearch Service. In case data delivery fails for an Amazon Elasticsearch Service destination, you can specify a retry duration between 0 and 7,200 seconds when you create the delivery stream. If data delivery to your Amazon Elasticsearch Service endpoint fails, Kinesis Data Firehose retries data delivery for the specified time duration. After the retrial period, Kinesis Data Firehose skips the current batch of data and moves on to the next batch. Skipped documents go to your Amazon S3 bucket elasticsearch_failed folder, which you can use for manual backfill.

For more information about sizing, see Get started with Amazon Elasticsearch Service: T-shirt-size your domain.

Solution overview

To show you how to use this new feature, this post uses stock demo data available on the Kinesis Data Firehose console to deliver to an Amazon Elasticsearch Service endpoint in VPC. The following diagram illustrates the workflow.

This use case simulates a producer sending stock ticker data to the delivery stream (A). You use an AWS Lambda function (B) to add a timestamp to the stock records so that you can create Kibana visualization. Kinesis Data Firehose streams the stock records to the Amazon Elasticsearch Service endpoint (C) in your VPC. Finally, you can visualize the data using Kibana (D).

This post uses the Amazon Management Console to implement this solution, but you can also use AWS CLI.

Creating security groups

Start by creating two security groups: one for the Amazon Elasticsearch Service VPC endpoint (es-sec-grp) and another for the delivery stream (kdf-sec-grp). Create security groups without any rules first. After you have created them, set the inbound and outbound rules. The following table summarizes these rules.

Creating an Amazon Elasticsearch Service VPC endpoint

To create an Amazon Elasticsearch Service endpoint in VPC, complete the following steps:

  1. On the Amazon Elasticsearch Service console, choose Create a new Domain.
  2. For Deployment Type and Latest Version, choose Development and Testing.
  3. Choose Next.
  4. Give your Amazon Elasticsearch Service endpoint a name.
  5. Select your instance type.

This post uses m5.xlarge.elasticsearch. For production environments, select the appropriately sized instance type. For this post, leave the number of nodes at 1, though best practice is to set it to 2.

  1. Set EBS storage size per node to 100 GiB.
  2. Leave the rest of the settings at their defaults and choose Next.
  3. Select the VPC and private subnet for your Amazon Elasticsearch Service endpoint and the security group for Amazon Elasticsearch Service that you created previously (es-sec-grp).
  4. To access Kibana, choose fine-grained access.
  5. Choose Create Master User.

In this post, we are using internal user database enabled with HTTP basic authentication. For production environments, use IAM roles and configure the appropriate fine-grained access. For more information, see Fine-Grained Access Control in Amazon Elasticsearch Service.

  1. Choose Allow open access to Domain.

Security groups already enforce IP-based access policies. This step opens access to your Amazon Elasticsearch Service endpoint to resources in your VPC, and your Amazon Elasticsearch Service endpoint is not accessible to the internet. For an additional layer of security in your Amazon Elasticsearch Service endpoint, use access policies that specify IAM users or roles. For more information about controlling access to your domains, see Identity and Access Management in Amazon Elasticsearch Service.

  1. Choose Next.
  2. Review your settings and choose Confirm.

The following screenshot shows an example of what your Amazon Elasticsearch Service endpoint VPC settings should look like.

Creating a Lambda function for record transformation

Create a Lambda function to add a timestamp to the data feed. Complete the following steps:

  1. On the Lambda console, choose Create Function.
  2. Choose Author from scratch.
  3. Name your function; for example, tmakAddTSToStream.
  4. Choose Python 3.7 as your runtime.
  5. Choose Create.

The following code is for your Lambda function (under the basic settings section, change the timeout from 3 sec to 45 sec):

import base64
import json
from datetime import datetime

def lambda_handler(event, context):
    
    send_back = []
    now = datetime.utcnow().isoformat()

    for record in event['records']:
        stock_rec = json.loads(base64.b64decode(record['data']))
        stock_rec["timestamp"] = now
       
        record_w_ts = {
                'recordId': record['recordId'],
                'result': 'Ok',
                'data': base64.b64encode(json.dumps(stock_rec).encode('utf-8') + b'\n').decode('utf-8')
        }
        send_back.append(record_w_ts)

    return {'records': send_back} 

Creating a Kinesis Data Firehose delivery stream

To create your delivery stream, complete the following steps:

  1. On the Kinesis Data Firehose console, under Data Firehose, choose Create Delivery Stream.
  2. Enter a name for your stream; for example, tmak-kdf-stock-delivery-stream.
  3. For source, choose Direct PUT or other sources.
  4. Choose Next.
  5. For Data transformation, choose Enabled.
  6. Choose the Lambda function you created.
  7. Choose Next.
  8. Choose Amazon Elasticsearch Service as the destination for your delivery stream.
  9. For Index, enter stockdata.

The VPC section populates automatically. Make sure you use the security group you created for Kinesis Data Firehose (kdf-sec-grp).

  1. For Backup Mode, choose Failed records only.

You can select an existing S3 bucket or create a new one. The following screenshot shows an example of your delivery stream settings.

  1. Choose Next.
  2. Review the buffering settings and set any tags to identify your stream.

A delivery stream that delivers to VPC destinations needs permissions to manage ENIs, list VPCs, and subnets. The console gives you the option to create a new role based on a template that includes all the needed permissions. You can also use an existing role if you already created one.

  1. Choose Next.
  2. Review the settings and choose Create Stream.

It may take up to a few minutes to see the stream status show as Active. See the following screenshot.

On the Amazon EC2 console, under Network and Security, you can see the endpoints created in your VPC by Kinesis Data Firehose and Amazon ES. See the following screenshot.

Configuring Kibana fine-grained access for Kinesis Data Firehose

You need to give Kinesis Data Firehose permissions to deliver stock data to your Amazon Elasticsearch Service endpoint. You can accomplish this via the Kibana console or API. For more information, see API on the Open Distro for Elasticsearch website.

For more information about controlling access to your Amazon Elasticsearch Service endpoint, see How to Control Access to Your Amazon Elasticsearch Service Domain.

Because your Amazon Elasticsearch Service endpoint is in the VPC to access Kibana, you must first connect to the VPC. This process varies by network configuration, but likely involves connecting to a VPN or corporate network. For this post, create a remote desktop EC2 instance public subnet of your VPC. The newly created security group (rdp-sec-grp) protects the instance. You can modify the es-sec-grp security group and allow inbound RDP traffic from rdp-sec-grp so you can access the Kibana URL. The following diagram illustrates this architecture.

Kinesis Data Firehose uses the delivery role to sign HTTP (Signature Version 4) requests before sending the data to the Amazon Elasticsearch Service endpoint. You manage Amazon Elasticsearch Service fine-grained access control permissions using roles, users, and mappings. This section describes how to create roles and set permissions for Kinesis Data Firehose.

The roles you create in this section are different from IAM roles. For more information, see Key Concepts.

Complete the following steps:

  1. Navigate to Kibana (you can find the URL on the Amazon Elasticsearch Service console).
  2. Enter the master user and password that you set up when you created the Amazon Elasticsearch Service endpoint.
  3. Under Security, choose Roles.
  4. Choose Add New Role.
  5. Name your role; for example, firehose-role.
  6. For cluster permissions, add cluster_composite_ops and cluster_monitor.
  7. Under Index permissions, choose Index Patterns and enter stockdata*.
  8. Under Permissions, add three action groups: crud, create_index, and manage.
  9. Choose Save Role Definition.

In the next step, you map the IAM role that Kinesis Data Firehose uses to the role you just created.

  1. Under Security, choose Role Mappings.
  2. Choose the role you just created (firehose-role).
  3. For Backend Roles, choose Add Backend Role.
  4. Enter the IAM ARN of the role Kinesis Data Firehose uses: arn:aws:iam::123456789012:role/firehose_stream_role_name.

You can find your delivery stream ARN on the Kinesis Data Firehose console.

Streaming stock data through Kinesis Data Firehose

To stream your stock data, complete the following steps:

  1. On the Kinesis Data Firehose console, choose the stream you created.
  2. Choose Test with demo data.
  3. Choose Start sending demo data.

If everything is working, you see message Demo data is being sent to your delivery stream. Wait a few minutes before you choose Stop sending demo data.

Analyzing and visualizing data

To analyze and visualize your data, complete the following steps:

  1. On the Kibana console, choose Management.
  2. Choose Index patterns.
  3. For Index pattern, enter stockdata*.
  4. Choose Next.
  5. For the Time filter field, choose timestamp.
  6. Choose Visualize.
  7. Create a new visualization and choose Line.
  8. For Index pattern, choose stockdata*.
  9. For Y-Axis, choose Aggregation=Average and Field=price.
  10. For X-Axis, choose Aggregation=Data Histogram, Field=timestamp, and Interval=seconds.
  11. Under X-Axis, choose Add Sub-buckets.
  12. Choose Split Series.
  13. Set Sub-Aggregation=Terms and Field=ticker_symbol.keyword.
  14. Choose Apply Changes.

The following screenshot shows an example visualization.

You can see the raw data by choosing Discover on the Kibana dashboard. See the following screenshot.

Summary

This post demonstrated how you can move an Amazon Elasticsearch Service endpoint inside your VPC with Kinesis Data Firehose. Additionally, you do not need to enable and secure public access to your Amazon Elasticsearch Service endpoint. If you have been reluctant to expose your Amazon Elasticsearch Service endpoint to the internet but want to stream data, you can now do so with Kinesis Data Firehose.

 


About the Authors

Tarik Makota is a Principal Solutions Architect with the Amazon Web Services. He provides technical guidance, design advice and thought leadership to AWS’ customers across US Northeast. He holds an M.S. in Software Development and Management from Rochester Institute of Technology.

 

Serverless Stream-Based Processing for Real-Time Insights

Post Syndicated from Justin Pirtle original https://aws.amazon.com/blogs/architecture/serverless-stream-based-processing-for-real-time-insights/

Building on our previous posts regarding messaging patterns and queue-based processing, we now explore stream-based processing and how it helps you achieve low-latency, near real-time data processing in your applications. AWS offers two managed services for streaming, Amazon Kinesis and Amazon Managed Streaming for Apache Kafka (Amazon MSK).

What is streaming data?

At AWS, we define streaming data as data that is emitted at high volume in a continuous, incremental manner with the goal of low-latency processing. Whereas traditional batch-oriented business intelligence would offer insights in retrospect after months, days, or hours have passed, stream-based processing can offer actionable insights in real time. Stream-based processing is commonly used to respond to clickstream events, rapidly ingest various types of logs, and extract, transform, and load (ETL) data in real-time into data lakes and data warehouses.

Amazon Kinesis is the AWS service that makes it easy to collect, process, and analyze such real-time, streaming data with four different capabilities:

For this blog post, we focus on Kinesis Data Streams and Kinesis Data Firehose, since both of these services are foundational for streaming, ingestion, buffering, and processing in your streaming data pipeline.

Kinesis Data Streams

Amazon Kinesis Data Streams is a massively scalable service that can continuously capture gigabytes of data per second from hundreds of thousands of sources. Like many distributed systems, Kinesis Data Streams achieves this level of scalability by partitioning or sharding your data where records are simultaneously written to and read from different shards in parallel. All Kinesis Data Streams require allocation of at least one shard and you choose how many shards you want to allocate to a given stream.

When writing to a shard in a Kinesis Data Stream, each shard supports ingestion of up to 1 MB of data per second or 1,000 records written per second. When reading from a shard, each shard supports output of 2 MB of data per second. You choose an initial number of shards to allocate for your Kinesis Data Stream, then can update your shard allocation over time. Increasing your shard allocation enables your application to easily scale from thousands of records to millions of records written per second.

Producing streaming data

Streaming data producers are processes that put records onto a Kinesis stream by calling the putRecord API to write a single record or putRecords API to write multiple records in a single invocation. Common approaches for producing messages including direct use of AWS tools, including:

  • AWS SDK, which simplifies authentication and other semantics of invoking AWS service APIs
  • Amazon Kinesis Agent, which enables local file/log monitoring and rotation sending in real time
  • Amazon Kinesis Producer Library, which simplifies aggregating records into larger payloads to improve throughput.

Additionally, several AWS services natively integrate with Amazon Kinesis as a data producer:

There are also several third-party services that offer native integration as data producers, including:

Regardless of the producer service/tool of choice, all data producers put records onto a stream by providing a partition key, stream name, and the data itself, which altogether must not exceed 1 MB in size. The partition key provided is used to determine which shard the data should be written to on the stream. Amazon Kinesis Data Streams offers ordering guarantees and maintain message ordering within a given shard in a stream using sequence numbers to track the unique position of each message sent.

Consuming streaming data

Once records are written to a Kinesis Data Stream, they are buffered in their respective shards for consumption. Unlike queue-based processing, the records are buffered until the data retention period set on the stream elapses, enabling one or more consumers to replay all in the messages in the shards of the stream. If your application must deliver your records to a data lake, data warehouse, Elasticsearch Service cluster, or Splunk, Kinesis Data Firehose can natively deliver your records to the following without needing to write any custom code:

  • Amazon S3
  • Amazon Redshift
  • Amazon Elasticsearch Service
  • Splunk

You simply indicate the desired delivery destination and configuration regarding how to batch and deliver the messages. Kinesis Data Firehose can also use your configured S3 desired object naming, Amazon Redshift table name, Amazon Elasticsearch index name, and more.

For custom processing or destinations outside of the Amazon Kinesis Data Firehose supported services above, you will need to write and execute custom code to consume data from the stream. Though you can use the Kinesis Client Library (KCL) to run your own custom processing application on persistent virtual machines or container instances, AWS Lambda offers serverless computing with native event source integration with Amazon Kinesis Data Streams. AWS Lambda as a stream consumer takes care of the operational overhead of reading shards, maintaining record order, check pointing as records are processed, and parallelizing processing.

Serverless stream processing with AWS Lambda

When configured with a Kinesis Stream as its event source, AWS Lambda continuously polls every shard in your stream at no extra charge and only invokes your Lambda code if and when there are messages in the stream. It additionally scales up the number of concurrent executions to parallelize reading all shards of a stream at the same time (and can have multiple executions reading the same shard simultaneously for a higher parallelization factor, if desired). AWS Lambda automatically checkpoints which records were successfully processed and handles retries and any failures automatically according to your desired configuration.

Best of all, there is no additional cost of the Lambda service handling all of these operational needs for you. You only pay for compute time when your function is invoked and messages are available on the stream for processing. You’re able to focus on processing your data with your business logic directly in your code since your records are sent as an array to your Lambda code. There is no additional code to author/manage regarding checkpointing, shard splits/merges, or other complexities.

Conclusion

In this blog, we defined streaming data and explored the Amazon Kinesis service and its various capabilities. We then reviewed the various options available for producing and consuming real-time streaming data with Amazon Kinesis, including using AWS Lambda for serverless streaming data processing. Please refer to the following resources for further learning on AWS streaming data processing:

Build a cloud-native network performance analytics solution on AWS for wireless service providers

Post Syndicated from Angelo Sampietro original https://aws.amazon.com/blogs/big-data/build-a-cloud-native-network-performance-analytics-solution-on-aws-for-wireless-service-providers/

This post demonstrates a serverless, cloud-based approach to building a network performance analytics solution using AWS services that can provide flexibility and performance while keeping costs under control with pay-per-use AWS services.

Without good network performance, you may struggle to face the challenges of real-time and low latency services and the increase of the total bandwidth your customers consume.

Considering the large volume of data that you need to ingest, store, and process every second for optimal performance monitoring, standard on-premises monitoring approaches are no longer efficient.

A cloud-native approach allows you to invest in the solutions that generate business value and move from the typical capital expenditure model to an operational expenditure model by avoiding upfront costs and over-provisioning of infrastructure.

Data and voice network complexity for mobile service providers

According to Cisco’s global mobile data traffic forecast, there will be 13.1 billion mobile-connected devices by 2023, and 1.4 billion of those will be 5G capable.

As a mobile service provider, you must understand how to perform accurate network planning and sizing for your access and core networks.

The increase in the global demand for network throughput and the number of services such as VoLTE, IoT, and video streaming on mobile networks is forcing mobile service providers to implement new architectures to match the desired quality of service (QoS).

Addressing optimal QoS when a multitude of services are running on a converged network is not an easy task. The workflow is complex, starting from collecting counters and statistics data from a multitude of different network elements to transforming the collected data into key performance indicators (KPIs) that you can link to the quality of one of the multiple services delivered over the network.

Modern mobile networks, with deployment of 4G, 5G, and IoT services, have given rise to an increased number of cells that are deployed on the territory, so you must collect counters and generate KPIs on thousands of different network elements.

Considering that every network element can generate a few thousand counters, the network performance system has to manage millions of measurements at every collection cycle.

This is difficult to manage at scale with on-premises deployments without a high-cost solution. Instead, you can use AWS services to design a modern network performance analytics solution that covers all the requirements of different departments of telecommunications service providers (TSPs).

Data and voice network architecture

The main problem that you may face as a service provider is the complexity of modern mobile networks, which originate from several evolutions of communication standards in the last few decades (from 2G to 5G for data core, from CS to VoLTE for voice core) and the hardware and capabilities of the network elements.

The following diagram illustrates a simplified schema of a mobile wireless network element currently deployed, where you can find:

  • Access network, with the network elements needed for the coverage from 2G to 5G
  • Core Network, that includes the network elements for all the functions needed to deliver the  services, the authentication and the database of all the users on the network
  • Services delivered by the network, that includes Voice (PSTN/PLMN), Internet (data service ) and Application Services

IoT network architecture and service differentiation

The IoT traffic paradigm is completely different from the traffic patterns of smartphone and tablets. These mobile devices normally open an always on session (PDP context or bearer) and generate a considerable amount of user plane traffic. An IoT device opens a session, sends few bytes, and closes the session to limit the power consumption and avoid allocating resources on cells when not necessary.

The following diagram depicts a real-world use case in which a service provider deploys IoT services, in most cases, IoT traffic is delivered to a different 4G core for the following reasons:

  • EPC optimization needed to manage IoT traffic
  • Avoiding IoT traffic managed by the same core network of customers’ traffic for security reasons
  • IoT devices generate huge volumes of control plane traffic that can impact customer performance if managed by the same network

This usage pattern requires a different traffic model and forces mobile service providers to perform firmware upgrades to the current cells or deploy new cells into the network. This has an impact on the QoS due to the increase in the number of counters and cells.

What is network performance, and why is it important?

You can use network performance monitoring systems to generate reports and insights and track network performance. Multiple organizations like Operations, Network Planning, and Engineering use these tools to view the overall quality of networks and services and control important KPIs like availability, response times, and download and upload speeds.

Network performance is strictly connected to the QoS that, by definition, is the mechanism to control the performance, reliability, and usability of a telecommunications service.

Pain points for service providers

Every vendor of network elements usually also provides the element managers for the hardware. These systems are proprietary, but most of the time, they comply with the standard provided by 3rd Generation Partnership Project (3GPP) for the XML file format of the performance measurement (PM) files to export the counters you need to measure network performance accurately.

Service providers normally also use multiple vendors for core and access network elements, so monitoring a non-heterogenous environment is one of the main challenges in today’s deployments.

Data types and 3GPP standards

Counters and statistics about the performance on a network element are exported in PM files. 3GPP standardized the format in a specific document, which you can download from the 3GPP website.

The network vendor is not guaranteed to follow this standard, but even if there are some customizations, the structure and format in most cases remain very similar to the open standard.

The following schema from 3GPP describes the XML format for PM that is exported from network elements.

The XML schema root element is “measCollectFile” and it has three child elements:

  1. “fileHeader”: contains information such as file sender and the beginning time of the measurements.
  2. “fileFooter”: contains the end time of the measurements.
  3. “measData”: contains all the measurements information such as measurement types and their values.

The most important tags are:

  • measInfo – Contains the family of the measurements, the granularity, and the counters list for each measValue
  • measValue – Contains multiple measResults with the results of the measurements

Building a network performance solution on AWS

In this section we describe a possible architecture that can be used to implement the solution on AWS following a serverless approach.

Prerequisites

To implement this solution, you must have the following prerequisites:

  • An AWS account in the same Region
  • The AdministratorAccess policy granted to your AWS account (for production, you should restrict access as needed)

This post uses the EU (Ireland) Region. However, you can choose another Region of your choice where the following services are available:

For more information about AWS Regions and where AWS services are available, see Region Table.

The following diagram illustrates the high-level, end-to-end solution and the AWS services it uses, the workflow begins with the ingestion of the files using SFTP or Kinesis Data Firehose, data is stored in S3 and processed with a Lambda function and Glue to create a data catalog. Data querying is done with Athena and the visualization in Quicksight.

Collecting data using Kinesis Data Firehose or AWS SFTP

For information about collecting PM files via element managers, see chapter 5 of Technical Specification 3GPP TS 31.432 on the 3GPP website.

Element managers act as collectors of XML measurement files that the network elements provide. They can send the files to an S3 bucket via AWS SFTP or Kinesis Data Firehose.

For the data transfer, you can refer to the following documentation about creating a Kinesis Firehose Delivery Stream and sending data to Kinesis Firehose Delivery Stream, or If you are planning to use SFTP transfer, you can read here how AWS Transfer for SFTP works.

Transforming data using a Lambda function

This post provides a Lambda function that is associated with the ObjectCreated event type of the destination S3 bucket rawxml prefix (s3://wireless-pm/rawxml). The function runs every time a new XML file is saved in this location.

The Lambda function is written in Python 3.7. You can download it from the GitHub repository for this blog post. The function uses a layer to resolve the dependency for the xmltodict library used in the code.

The following screenshot of the AWS Management Console shows some of the main properties of the Lambda function

  1. The Lambda name and its associated Lambda Layer.
  2. The S3 bucket trigger event on the ObjectCreated event.

The function transposes the XML files, converts them into CSV or JSON (depending on the value set in the function’s output_format environment variable), and formats the files with one record per measurement (measData), measure type, and value (measInfo). The technical specification 3GPP TS 32.435 document on the 3GPP website provides three example XML files in the Annex section.

The following code is an example A.1 XML file:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="MeasDataCollection.xsl"?>
<measCollecFile xmlns="http://www.3gpp.org/ftp/specs/archive/32_series/32.435#measCollec"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.3gpp.org/ftp/specs/archive/32_series/32.435#measCollec http://www.3gpp.org/ftp/specs/archive/32_series/32.435#measCollec">
    <fileHeader fileFormatVersion="32.435 V7.0" vendorName="Company NN" dnPrefix="DC=a1.companyNN.com,SubNetwork=1,IRPAgent=1">
        <fileSender localDn="SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1" elementType="RNC"/>
        <measCollec beginTime="2000-03-01T14:00:00+02:00"/>
    </fileHeader>
    <measData>
        <managedElement localDn="SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1" userLabel="RNC Telecomville"/>
        <measInfo>
            <job jobId="1231"/>
            <granPeriod duration="PT900S" endTime="2000-03-01T14:14:30+02:00"/>
            <repPeriod duration="PT1800S"/>
            <measTypes>attTCHSeizures succTCHSeizures attImmediateAssignProcs succImmediateAssignProcs</measTypes>
            <measValue measObjLdn="RncFunction=RF-1,UtranCell=Gbg-997">
                <measResults>234 345 567 789</measResults>
            </measValue>
            <measValue measObjLdn="RncFunction=RF-1,UtranCell=Gbg-998">
                <measResults>890 901 123 234</measResults>
            </measValue>
            <measValue measObjLdn="RncFunction=RF-1,UtranCell=Gbg-999">
                <measResults>456 567 678 789</measResults>
                <suspect>true</suspect>
            </measValue>
        </measInfo>
    </measData>
    <fileFooter>
        <measCollec endTime="2000-03-01T14:15:00+02:00"/>
    </fileFooter>
</measCollecFile>

The Lambda function transforms the preceding file into the following JSON format (this code example only shows the first record of the transformed and transposed dataset):

{
    "fh_file_format_version": "32.435 V7.0",
    "fh_vendor_name": "Company NN",
    "fh_dn_prefix": "DC=a1.companyNN.com,SubNetwork=1,IRPAgent=1",
    "fh_fs_local_dn": "SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1",
    "fh_fs_element_type": "RNC",
    "fh_mc_begin_time": "2000-03-01T14:00:00+02:00",
    "ff_mc_end_time": "2000-03-01T14:15:00+02:00",
    "md_me_local_dn": "SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1",
    "md_me_user_label": "RNC Telecomville",
    "md_me_sw_version": "",
    "md_mi_meas_info_id": "",
    "md_mi_job_jobid": "1231",
    "md_mi_gp_duration": "PT900S",
    "md_mi_gp_end_time": "2000-03-01T14:14:30+02:00",
    "md_mi_rp_duration": "PT1800S",
    "md_mi_meas_obj_ldn": "RncFunction=RF-1,UtranCell=Gbg-997",
    "md_mi_meas_name": "attTCHSeizures",
    "md_mi_meas_value": 234,
    "md_mi_meas_p": null,
    "md_mi_meas_suspect": null
}

You can change the output field names in the get_record_header static method of the GPPXml class defined in the Lambda function. The fields md_mi_meas_name and md_mi_meas_value contain the measure name and measure value, respectively.

The transformed CSV and JSON files are saved in the raw_transform_csv and raw_transform_json prefixes, respectively, in the S3 bucket (only one format is created for each execution, depending on the value of the output_format environment variable). The following screenshot shows the S3 bucket overview on the Amazon S3 console.

For this use case, a Lambda function is triggered for the transformation task every time a new XML file arrives. Depending on the volume and size of the XML files, you can choose other compute options on AWS based on the right fit, such as in containers using AWS Batch, Amazon ECS, or Amazon EKS. For more information about configuring and running processes using Amazon ECS, see Building, deploying, and operating containerized applications with AWS Fargate.

Alternatively, you could ingest the raw XML file using a Kinesis Firehose stream with a Lambda function attached to it to transform the data to JSON and output directly to Amazon S3 in Parquet or ORC format using the record format conversion feature of Kinesis Firehose. You would need to predefine a table in the Data Catalog whose schema, serializer, and deserializer you use to convert the data.

Building a Data Catalog using AWS Glue

To create an AWS Glue table in the Data Catalog, first create an AWS Glue crawler. Complete the following steps:

  1. Select AWS Glue from the services in the AWS Console.
  2. Select Crawlers in the left side menu.
  3. Click the button “Add Crawlers”.
  4. For Crawler name, enter a name for your crawler; for example, Wireless_PM_crawler.
  5. Choose Next.
  6. For Choose a data store, choose S3.
  7. For Crawl data in, select Specified path in my account.
  8. For Include path, enter the path to the input files in CSV format.
  9. Choose Next.
  10. In the Choose an IAM role section, select Choose an existing IAM role.
  11. For IAM role, enter the name of your role to grant read permission to access the S3 bucket.
  12. Choose Next.The following code is the AWSGlueServiceRole-S3 IAM policy for this post:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject"
                ],
                "Resource": [
                    "arn:aws:s3:::npm-blog/raw_transform_csv/Parquet*",
                    "arn:aws:s3:::npm-blog/raw_transform_csv/*"
                ]
            }
        ]
    }

  13. In the Choose the crawler’s output section, for Database, choose the database in which the Data Catalog table is created; for example, wireless_pm.
  14. Choose Next.In these next steps, you create an ETL job that converts from CSV to Parquet (a columnar file format) and writes the Parquet files to an S3 bucket. Parquet file format helps with performance of the downstream consumption of the data. To convert to Parquet, you can either use an AWS Glue generated script (which this post uses) or use your own PySpark or Scala script.
  15. In the Configure the job properties section, for Name, enter wireless_pm_parquet_conversion.
  16. For IAM role, enter AWSGlueServiceRole-wirelesspm
  17. For Type, choose Spark.
  18. For Glue version, choose Spark 2.4, Python 3 (Glue Version 1.0).
  19. For This job runs, select A proposed script generated by AWS Glue.
  20. For Script file name, enter wireless_pm_parquet_conversion.
  21. In the Choose a data target section, select Create tables in your data target.
  22. For Data store, choose Amazon S3.
  23. For Format, choose Parquet.
  24. For Target path, enter the path to your S3 bucket.
  25. Choose Next.

You can configure a trigger to start the job by creating a workflow in AWS Glue that runs the job after the crawler. When the job is complete, a new set of Parquet files are stored in the destination S3 bucket.

You can analyze the processed data directly with Athena by selecting the database created in AWS Glue, and analyzing the data from the AWS Glue table.

The following screenshot shows the AWS Glue database and the two tables created.

The following screenshot shows an example Athena query, which you can run by selecting a few columns to validate the data from the Data Catalog table you created.

You can also use other AWS services for analytics such as Amazon Redshift or Amazon EMR for further processing, analysis, and KPI calculations from the data.

Visualizing data using Amazon QuickSight

Amazon QuickSight is a business analytics service you can use to build visualizations, perform ad hoc analysis, and get business insights from your data. For more information, see What Is Amazon QuickSight?

Amazon QuickSight provides out-of-the-box integration with Athena, which lets you run SQL queries on top of the metadata in your Data Catalog. For more information, see Creating a Data Set Using Amazon Athena Data.

The following screenshot shows an Amazon QuickSight analysis created from an Amazon QuickSight dataset. The dataset is based on the table that AWS Glue defined, and you query it via Athena.

You can create a visualization by loading the example dataset of PM counters. For example, the following screenshot shows the measurement values for the GPRS SuccessAttach and AbortedAttach and highlights a problem.

Summary

This post discussed the main pain points for wireless service providers and how AWS services can help you build a serverless solution that scales according to your network growth with no upfront costs.

This solution also helps you visualize and analyze data. Additionally, it provided insights that can help operations and network planning departments manage and evolve their network according to current and future standards and services.

As always, AWS welcomes feedback. Please submit comments or questions in the comments section.

 


About the Authors

Angelo Sampietro is a senior partner solutions architect at Amazon Web Services. Angelo has a strong background in cloud computing, with 20 years of experience in the Telecommunications industry, working in the United States and Europe. With 5 AWS Certifications, including Big Data and Machine Learning Specialties, he currently works as Senior Partner Solutions Architect, helping Global System Integrators to be successful in the partnership with AWS. He loves to share new ideas with colleagues and friends and propose new solutions thinking out of the box.

 

 

Francesco Marelli is a senior solutions architect at Amazon Web Services. He has lived and worked in London for 10 years, after that he has worked in Italy, Switzerland and other countries in EMEA. He is specialized in the design and implementation of Analytics, Data Management and Big Data systems, mainly for Enterprise and FSI customers. Francesco also has a strong experience in systems integration and design and implementation of web applications. He loves sharing his professional knowledge, collecting vinyl records and playing bass.

 

 

 

Introduction to Messaging for Modern Cloud Architecture

Post Syndicated from Sam Dengler original https://aws.amazon.com/blogs/architecture/introduction-to-messaging-for-modern-cloud-architecture/

We hope you’ve enjoyed reading our posts on best practices for your serverless applications. The posts in the following series will focus on best practices when introducing messaging patterns into your applications. Let’s review some core messaging concepts and see how they can be used to address challenges when designing modern cloud architectures.

Introduction

Applications can communicate information with each other using messages, a mechanism for packaging a data payload and associated metadata. The application that sends a message is called the producer and the application that receives the message is called the consumer. Producers and consumers exchange messages using a variety of transportation channels, for example point-to-point requests, message queues, subscription topics, or event buses. These transportation channels have differently characteristics that make them useful when implementing message communication patterns. Dependencies emerge when producers and consumers exchange messages, which is called coupling.

Synchronous Communication

synchronous communication

Message communication is called synchronous when the producer sends a message to the consumer and waits for a response before the producer continues its processing logic. An example of synchronous communication over a point-to-point channel is when a HTTP client makes a request to a HTTP service, waits for the service to process the request, and then applies logic to the HTTP response to determine how to proceed.

Synchronous communication patterns are more straightforward to implement, however they create tight coupling between producers and consumers. Tight coupling can cause problems due to traffic spikes and failures propagating directly throughout the application. For example, in a three-tier architecture, when the application experiences a spike in client traffic, the web tier directly translates the traffic spike as pressure on downstream resources (the logic and data tiers), which may not scale to meet the sudden demand. Likewise, downstream resource failure in the logic or data tier directly impacts the web tier from responding to client requests. Applications can mimic a synchronous experience, for example a status spinner, using asynchronous communication with a polling or push notification strategy.

Asynchronous Communication

Asynchronous communication

Message communication is called asynchronous when the producer sends a message to the consumer and proceeds without waiting for the response. An example of asynchronous communication over a message queue channel is when a client publishes a message to a queue, and after the queue acknowledges receipt of the message, the publisher proceeds without waiting for the consumer to process the message.

Asynchronous communication patterns are implemented using transportation channels such as queues, topics, and event buses to create loose coupling between producers and consumers. Loose coupling increases an architecture’s resiliency to failure and ability to handle traffic spikes because it creates an indirection between producer and consumer communication, enabling them to operate independently of each other. Using the three-tier architecture example, a message queue can be introduced between the web, logic, and data tiers to enable each to scale independently of each other. When the application experiences a spike in client traffic, the web tier translates the traffic spike as more messages to the queue for processing, however the logic tier may continue to process messages off the queue without being directly impacted.

Considerations and Next Steps

Although asynchronous communication patterns can benefit modern cloud architectures, there are tradeoffs to consider. Asynchronous messaging adds latency to end-to-end processing time due to the addition of middleware. Producers and consumers take a dependency on the middleware stack, which must also scale to meet demand and be resilient to failure. Care must be taken to appropriately configure producers, consumers, and middleware to handle errors so that messages are not lost, more monitoring is required to ensure proper operations, and multiple logs must be correlated to troubleshoot and diagnose problems.

Amazon MQ, Amazon KinesisAmazon Simple Queue Service (SQS), Amazon Simple Notification Service (SNS), and Amazon EventBridge are highly available, large scale, failure resistant managed services that can be used to implement asynchronous messaging patterns. You can explore these services at the AWS Messaging page and their integration into Serverless Architectures in the free new digital course, Architecting Serverless Solutions. You can also visit the AWS Event-Driven Architecture page to learn how to apply messaging patterns to build event-driven solutions. The upcoming posts in this series will explore these AWS services to help ensure message patterns are implemented using best practices when applied to modern cloud architecture.

Streaming ETL with Apache Flink and Amazon Kinesis Data Analytics

Post Syndicated from Steffen Hausmann original https://aws.amazon.com/blogs/big-data/streaming-etl-with-apache-flink-and-amazon-kinesis-data-analytics/

Most businesses generate data continuously in real time and at ever-increasing volumes. Data is generated as users play mobile games, load balancers log requests, customers shop on your website, and temperature changes on IoT sensors. You can capitalize on time-sensitive events, improve customer experiences, increase efficiency, and drive innovation by analyzing this data quickly. The speed at which you get these insights often depends on how quickly you can load data into data lakes, data stores, and other analytics tools. As data volume and velocity increases, it becomes more important to not only load the incoming data, but also to transform and analyze it in near-real time.

This post looks at how to use Apache Flink as a basis for sophisticated streaming extract-transform-load (ETL) pipelines. Apache Flink is a framework and distributed processing engine for processing data streams. AWS provides a fully managed service for Apache Flink through Amazon Kinesis Data Analytics, which enables you to build and run sophisticated streaming applications quickly, easily, and with low operational overhead.

This post discusses the concepts that are required to implement powerful and flexible streaming ETL pipelines with Apache Flink and Kinesis Data Analytics. It also looks at code examples for different sources and sinks. For more information, see the GitHub repo. The repo also contains an AWS CloudFormation template so you can get started in minutes and explore the example streaming ETL pipeline.

Architecture for streaming ETL with Apache Flink

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. It supports a wide range of highly customizable connectors, including connectors for Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, and Amazon Simple Storage Service (Amazon S3). Moreover, Apache Flink provides a powerful API to transform, aggregate, and enrich events, and supports exactly-once semantics. Apache Flink is therefore a good foundation for the core of your streaming architecture.

To deploy and run the streaming ETL pipeline, the architecture relies on Kinesis Data Analytics. Kinesis Data Analytics enables you to run Flink applications in a fully managed environment. The service provisions and manages the required infrastructure, scales the Flink application in response to changing traffic patterns, and automatically recovers from infrastructure and application failures. You can combine the expressive Flink API for processing streaming data with the advantages of a managed service by using Kinesis Data Analytics to deploy and run Flink applications. It allows you to build robust streaming ETL pipelines and reduces the operational overhead of provisioning and operating infrastructure.

The architecture in this post takes advantage of several capabilities that you can achieve when you run Apache Flink with Kinesis Data Analytics. Specifically, the architecture supports the following:

  • Private network connectivity – Connect to resources in your Amazon Virtual Private Cloud (Amazon VPC), in your data center with a VPN connection, or in a remote region with a VPC peering connection
  • Multiple sources and sinks – Read and write data from Kinesis data streams, Apache Kafka clusters, and Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters
  • Data partitioning – Determine the partitioning of data that is ingested into Amazon S3 based on information extracted from the event payload
  • Multiple Elasticsearch indexes and custom document IDs – Fan out from a single input stream to different Elasticsearch indexes and explicitly control the document ID
  • Exactly-once semantics – Avoid duplicates when ingesting and delivering data between Apache Kafka, Amazon S3, and Amazon Elasticsearch Service (Amazon ES)

The following diagram illustrates this architecture.

The remainder of this post discusses how to implement streaming ETL architectures with Apache Flink and Kinesis Data Analytics. The architecture persists streaming data from one or multiple sources to different destinations and is extensible to your needs. This post does not cover additional filtering, enrichment, and aggregation transformations, although that is a natural extension for practical applications.

This post shows how to build, deploy, and operate the Flink application with Kinesis Data Analytics, without further focusing on these operational aspects. It is only relevant to know that you can create a Kinesis Data Analytics application by uploading the compiled Flink application jar file to Amazon S3 and specifying some additional configuration options with the service. You can then execute the Kinesis Data Analytics application in a fully managed environment. For more information, see Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications and the Amazon Kinesis Data Analytics developer guide.

Exploring a streaming ETL pipeline in your AWS account

Before you consider the implementation details and operational aspects, you should get a first impression of the streaming ETL pipeline in action. To create the required resources, deploy the following AWS CloudFormation template:

The template creates a Kinesis data stream and an Amazon Elastic Compute Cloud (Amazon EC2) instance to replay a historic data set into the data stream. This post uses data based on the public dataset obtained from the New York City Taxi and Limousine Commission. Each event describes a taxi trip made in New York City and includes timestamps for the start and end of a trip, information on the boroughs the trip started and ended in, and various details on the fare of the trip. A Kinesis Data Analytics application then reads the events and persists them to Amazon S3 in Parquet format and partitioned by event time.

Connect to the instance by following the link next to ConnectToInstance in the output section of the CloudFromation template that you executed previously. You can then start replaying a set of taxi trips into the data stream with the following code:

$ java -jar /tmp/amazon-kinesis-replay-*.jar -noWatermark -objectPrefix artifacts/kinesis-analytics-taxi-consumer/taxi-trips-partitioned.json.lz4/dropoff_year=2018/ -speedup 3600 -streamName <Kinesis stream name>

You can obtain this command with the correct parameters from the output section of the AWS CloudFormation template. The output section also points you to the S3 bucket to which events are persisted and an Amazon CloudWatch dashboard that lets you monitor the pipeline.

For more information about enabling the remaining combinations of sources and sinks, for example, Apache Kafka and Elasticsearch, see the GitHub repo.

Building a streaming ETL pipeline with Apache Flink

Now that you have seen the pipeline in action, you can dive into the technical aspects of how to implement the functionality with Apache Flink and Kinesis Data Analytics.

Reading and writing to private resources

Kinesis Data Analytics applications can access resources on the public internet and resources in a private subnet that is part of your VPC. By default, a Kinesis Data Analytics application only enables access to resources that you can reach over the public internet. This works well for resources that provide a public endpoint, for example, Kinesis data streams or Amazon Elasticsearch Service.

If your resources are private to your VPC, either for technical or security-related reasons, you can configure VPC connectivity for your Kinesis Data Analytics application. For example, MSK clusters are private; you cannot access them from the public internet. You may run your own Apache Kafka cluster on premises that is not exposed to the public internet and is only accessible from your VPC through a VPN connection. The same is true for other resources that are private to your VPC, such as relational databases or AWS PrivateLink-powered endpoints.

To enable VPC connectivity, configure the Kinesis Data Analytics application to connect to private subnets in your VPC. Kinesis Data Analytics creates elastic network interfaces in one or more of the subnets provided in your VPC configuration for the application, depending on the parallelism of the application. For more information, see Configuring Kinesis Data Analytics for Java Applications to access Resources in an Amazon VPC.

The following screenshot shows an example configuration of a Kinesis Data Analytics application with VPC connectivity:

The application can then access resources that have network connectivity from the configured subnets. This includes resources that are not directly contained in these subnets, which you can reach over a VPN connection or through VPC peering. This configuration also supports endpoints that are available over the public internet if you have a NAT gateway configured for the respective subnets. For more information, see Internet and Service Access for a VPC-Connected Kinesis Data Analytics for Java application.

Configuring Kinesis and Kafka sources

Apache Flink supports various data sources, including Kinesis Data Streams and Apache Kafka. For more information, see Streaming Connectors on the Apache Flink website.

To connect to a Kinesis data stream, first configure the Region and a credentials provider. As a general best practice, choose AUTO as the credentials provider. The application will then use temporary credentials from the role of the respective Kinesis Data Analytics application to read events from the specified data stream. This avoids baking static credentials into the application. In this context, it is also reasonable to increase the time between two read operations from the data stream. When you increase the default of 200 milliseconds to 1 second, the latency increases slightly, but it facilitates multiple consumers reading from the same data stream. See the following code:

Properties properties = new Properties();
properties.setProperty(AWSConfigConstants.AWS_REGION, "<Region name>");
properties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
properties.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");

This config is passed to the FlinkKinesisConsumer with the stream name and a DeserializationSchema. This post uses the TripEventSchema for deserialization, which specifies how to deserialize a byte array that represents a Kinesis record into a TripEvent object. See the following code:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<TripEvent> events = env.addSource(
  new FlinkKinesisConsumer<>("<Kinesis stream name>", new TripEventSchema(), properties)
);

For more information, see TripEventSchema.java and TripEvent.java on GitHub. Apache Flink provides other more generic serializers that can deserialize data into strings or JSON objects.

Apache Flink is not limited to reading from Kinesis data streams. If you configure the Kinesis Data Analytics application’s VPC settings correctly, Apache Flink can also read events from Apache Kafka and MSK clusters. Specify a comma-separated list of broker and port pairs to use for the initial connection to your cluster. This config is passed to the FlinkKafkaConsumer with the topic name and a DeserializationSchema to create a source that reads from the respective topic of the Apache Kafka cluster. See the following code:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "<comma separated list of broker and port pairs>");

DataStream<TripEvent> events = env.addSource(
  new FlinkKafkaConsumer<>("<topic name>", new TripEventSchema(), properties)
);

The resulting DataStream contains TripEvent objects that have been deserialized from the data ingested into the data stream and Kafka topic, respectively. You can then use the data streams in combination with a sink to persist the events into their respective destination.

Persisting data in Amazon S3 with data partitioning

When you persist streaming data to Amazon S3, you may want to partition the data. You can substantially improve query performance of analytic tools by partitioning data because partitions that cannot contribute to a query can be pruned and therefore do not need to be read. For example, the right partitioning strategy can improve Amazon Athena query performance and cost by reducing the amount of data read for a query. You should choose to partition your data by the same attributes used in your application logic and query patterns. Furthermore, it is common when processing streaming data to include the time of an event, or event time, in your partitioning strategy. This contrasts with using the ingestion time or some other service-side timestamp that does not reflect the time an event occurred as accurately as event time.

For more information about taking data partitioned by ingestion time and repartitioning it by event time with Athena, see Analyze your Amazon CloudFront access logs at scale. However, you can directly partition the incoming data based on event time with Apache Flink by using the payload of events to determine the partitioning, which avoids an additional post-processing step. This capability is called data partitioning and is not limited to partition by time.

You can realize data partitioning with Apache Flink’s StreamingFileSink and BucketAssigner. For more information, see Streaming File Sink on the Apache Flink website.

When given a specific event, the BucketAssigner determines the corresponding partition prefix in the form of a string. See the following code:

public class TripEventBucketAssigner implements BucketAssigner<TripEvent, String> {
  public String getBucketId(TripEvent event, Context context) {
    return String.format("pickup_location=%03d/year=%04d/month=%02d",
        event.getPickupLocationId(),
        event.getPickupDatetime().getYear(),
        event.getPickupDatetime().getMonthOfYear()
    );
  }

  ...
}

The sink takes an argument for the S3 bucket as a destination path and a function that converts the TripEvent Java objects into a string. See the following code:

SinkFunction<TripEvent> sink = StreamingFileSink
  .forRowFormat(
    new Path("s3://<Bucket name>"),
    (Encoder<TripEvent>) (element, outputStream) -> {
      PrintStream out = new PrintStream(outputStream);
      out.println(TripEventSchema.toJson(element));
    }
  )
  .withBucketAssigner(new TripEventBucketAssigner())
  .withRollingPolicy(DefaultRollingPolicy.create().build())
  .build();

events.keyBy(TripEvent::getPickupLocationId).addSink(sink);

You can further customize the size of the objects you write to Amazon S3 and the frequency of the object creation with a rolling policy. You can configure your policy to have more events aggregated into fewer objects at the cost of increased latency, or vice versa. This can help avoid many small objects on Amazon S3, which can be a desirable trade-off for increased latency. A high number of objects can negatively impact the query performance of consumers reading the data from Amazon S3. For more information, see DefaultRollingPolicy on the Apache Flink website.

The number of output files that arrive in your S3 bucket per your rolling policy also depends on the parallelism of the StreamingFileSink and how you distribute events between Flink application operators. In the previous example, the Flink internal DataStream is partitioned by pickup location ID with the keyBy operator. The location ID is also used in the BucketAssigner as part of the prefix for objects that are written to Amazon S3. Therefore, the same node aggregates and persists all events with the same prefix, which results in particularly large objects on Amazon S3.

Apache Flink uses multipart uploads under the hood when writing to Amazon S3 with the StreamingFileSink. In case of failures, Apache Flink may not be able to clean up incomplete multipart uploads. To avoid unnecessary storage fees, set up the automatic cleanup of incomplete multipart uploads by configuring appropriate lifecycle rules on the S3 bucket. For more information, see Important Considerations for S3 on the Apache Flink website and Example 8: Lifecycle Configuration to Abort Multipart Uploads.

Converting output to Apache Parquet

In addition to partitioning data before delivery to Amazon S3, you may want to compress it with a columnar storage format. Apache Parquet is a popular columnar format, which is well supported in the AWS ecosystem. It reduces the storage footprint and can substantially increase query performance and reduce cost.

The StreamingFileSink supports Apache Parquet and other bulk-encoded formats through a built-in BulkWriter factory. See the following code:

SinkFunction<TripEvent> sink = StreamingFileSink
  .forBulkFormat(
    new Path("s3://<bucket name>"),
    ParquetAvroWriters.forSpecificRecord(TripEvent.class)
  )
  .withBucketAssigner(new TripEventBucketAssigner())
  .build();

events.keyBy(TripEvent::getPickupLocationId).addSink(sink);

For more information, see Bulk-encoded Formats on the Apache Flink website.

Persisting events works a bit differently when you use the Parquet conversion. When you enable Parquet conversion, you can only configure the StreamingFileSink with the OnCheckpointRollingPolicy, which commits completed part files to Amazon S3 only when a checkpoint is triggered. You need to enable Apache Flink checkpoints in your Kinesis Data Analytics application to persist data to Amazon S3. It only becomes visible for consumers when a checkpoint is triggered, so your delivery latency depends on how often your application is checkpointing.

Moreover, you previously just needed to generate a string representation of the data to write to Amazon S3. In contrast, the ParquetAvroWriters expects an Apache Avro schema for events. For more information, see the GitHub repo. You can use and extend the schema on the repo if you are want an example.

In general, it is highly desirable to convert data into Parquet if you want to work with and query the persisted data effectively. Although it requires some additional effort, the benefits of the conversion outweigh these additional complexities compared to storing raw data.

Fanning out to multiple Elasticsearch indexes and custom document IDs

Amazon ES is a fully managed service that makes it easy for you to deploy, secure, and run Elasticsearch clusters. A popular use case is to stream application and network log data into Amazon S3. These logs are documents in Elasticsearch parlance, and you can create one for every event and store it in an Elasticsearch index.

The Elasticsearch sink that Apache Flink provides is flexible and extensible. You can specify an index based on the payload of each event. This is useful when the stream contains different event types and you want to store the respective documents in different Elasticsearch indexes. With this capability, you can use a single sink and application, respectively, to write into multiple indexes. With newer Elasticsearch versions, a single index cannot contain multiple types. See the following code:

SinkFunction<TripEvent> sink = AmazonElasticsearchSink.buildElasticsearchSink(
  "<Elasticsearch endpoint>",
  "<AWS region>",
  new ElasticsearchSinkFunction<TripEvent>() {
   public IndexRequest createIndexRequest(TripEvent element) {
    String type = element.getType().toString();
    String tripId = Long.toString(element.getTripId());

    return Requests.indexRequest()
      .index(type)
      .type(type)
      .id(tripId)
      .source(TripEventSchema.toJson(element), XContentType.JSON);
   }
);

events.addSink(sink);

You can also explicitly set the document ID when you send documents to Elasticsearch. If an event with the same ID is ingested into Elasticsearch multiple times, it is overwritten rather than creating duplicates. This enables your writes to Elasticsearch to be idempotent. In this way, you can obtain exactly-once semantics of the entire architecture, even if your data sources only provide at-least-once semantics.

The AmazonElasticsearchSink used above is an extension of the Elasticsearch sink that is comes with Apache Flink. The sink adds support to sign requests with IAM credentials so you can use the strong IAM-based authentication and authorization that is available from the service. To this end, the sink picks up temporary credentials from the Kinesis Data Analytics environment in which the application is running. It uses the Signature Version 4 method to add authentication information to the request that is sent to the Elasticsearch endpoint.

Leveraging exactly-once semantics

You can obtain exactly-once semantics by combining an idempotent sink with at-least-once semantics, but that is not always feasible. For instance, if you want to replicate data from one Apache Kafka cluster to another or persist transactional CDC data from Apache Kafka to Amazon S3, you may not be able to tolerate duplicates in the destination, but both of these sinks are not idempotent.

Apache Flink natively supports exactly-once semantics. Kinesis Data Analytics implicitly enables exactly-once mode for checkpoints. To obtain end-to-end exactly-once semantics, you need to enable checkpoints for the Kinesis Data Analytics application and choose a connector that supports exactly-once semantics, such as the StreamingFileSink. For more information, see Fault Tolerance Guarantees of Data Sources and Sinks on the Apache Flink website.

There are some side effects to using exactly-once semantics. For example, end-to-end latency increases for several reasons. First, you can only commit the output when a checkpoint is triggered. This is the same as the latency increases that occurred when you turned on Parquet conversion. The default checkpoint interval is 1 minute, which you can decrease. However, obtaining sub-second delivery latencies are difficult with this approach.

Also, the details of end-to-end exactly-once semantics are subtle. Although the Flink application may read in an exactly-once fashion from a data stream, duplicates may already be part of the stream, so you can only obtain at-least-once semantics of the entire application. For Apache Kafka as a source and sink, different caveats apply. For more information, see Caveats on the Apache Flink website.

Be sure that you understand all the details of the entire application stack before you take a hard dependency on exactly-once semantics. In general, if your application can tolerate at-least-once semantics, it’s a good idea to use that semantic instead of relying on stronger semantics that you don’t need.

Using multiple sources and sinks

One Flink application can read data from multiple sources and persist data to multiple destinations. This is interesting for several reasons. First, you can persist the data or different subsets of the data to different destinations. For example, you can use the same application to replicate all events from your on-premises Apache Kafka cluster to an MSK cluster. At the same time, you can deliver specific, valuable events to an Elasticsearch cluster.

Second, you can use multiple sinks to increase the robustness of your application. For example, your application that applies filters and enriches streaming data can also archive the raw data stream. If something goes wrong with your more complex application logics, Amazon S3 still has the raw data, which you can use to backfill the sink.

However, there are some trade-offs. When you bundle many functionalities in a single application, you increase the blast radius of failures. If a single component of the application fails, the entire application fails and you need to recover it from the last checkpoint. This causes some downtime and increased delivery latency to all delivery destinations in the application. Also, a single large application is often harder to maintain and to change. You should strike a balance between adding functionality or creating additional Kinesis Data Analytics applications.

Operational aspects

When you run the architecture in production, you set out to execute a single Flink application continuously and indefinitely. It is crucial to implement monitoring and proper alarming to make sure that the pipeline is working as expected and the processing can keep up with the incoming data. Ideally, the pipeline should adapt to changing throughput conditions and cause notifications if it fails to deliver data from the sources to the destinations.

Some aspects require specific attention from an operational perspective. The following section provides some ideas and further references on how you can increase the robustness of your streaming ETL pipelines.

Monitoring and scaling sources

The data stream and the MSK cluster, respectively, are the entry point to the entire architecture. They decouple the data producers from the rest of the architecture. To avoid any impact to data producers, which you often cannot control directly, you need to scale the input stream of the architecture appropriately and make sure that it can ingest messages at any time.

Kinesis Data Streams uses a throughput provisioning model based on shards. Each shard provides a certain read and write capacity. From the number of provisioned shards, you can derive the maximum throughput of the stream in terms of ingested and emitted events and data volume per second. For more information, see Kinesis Data Streams Quotas.

Kinesis Data Streams exposes metrics through CloudWatch that report on these characteristics and indicate whether the stream is over- or under-provisioned. You can use the IncomingBytes and IncomingRecords metrics to scale the stream proactively, or you can use the WriteProvisionedThroughputExceeded metrics to scale the stream reactively. Similar metrics exist for data egress, which you should also monitor. For more information, see Monitoring the Amazon Kinesis Data Streams with Amazon CloudWatch.

The following graph shows some of these metrics for the data stream of the example architecture. On average the Kinesis data stream receives 2.8 million events and 1.1 GB of data every minute.

You can use the IncomingBytes and IncomingRecords metrics to scale the stream proactively whereas you can use the WriteProvisionedThroughputExceeded metrics to scale the stream reactively. You can even automate the scaling of your Kinesis Data Streams. For more information, see Scale Your Amazon Kinesis Stream Capacity with UpdateShardCount.

Apache Kafka and Amazon MSK use a node-based provisioning model. Amazon MSK also exposes metrics through CloudWatch, including metrics that indicate how much data and how many events are ingested into the cluster. For more information, see Amazon MSK Metrics for Monitoring with CloudWatch.

In addition, you can also enable open monitoring with Prometheus for MSK clusters. It is a bit harder to know the total capacity of the cluster, and you often need benchmarking to know when you should scale. For more information about important metrics to monitor, see Monitoring Kafka on the Confluent website.

Monitoring and scaling the Kinesis Data Analytics application

The Flink application is the central core of the architecture. Kinesis Data Analytics executes it in a managed environment, and you want to make sure that it continuously reads data from the sources and persists data in the data sinks without falling behind or getting stuck.

When the application falls behind, it often is an indicator that it is not scaled appropriately. Two important metrics to track the progress of the application are millisBehindLastest (when the application is reading from a Kinesis data stream) and records-lag-max (when it is reading from Apache Kafka and Amazon MSK). These metrics not only indicate that data is read from the sources, but they also tell if data is read fast enough. If the values of these metrics are continuously growing, the application is continuously falling behind, which may indicate that you need to scale up the Kinesis Data Analytics application. For more information, see Kinesis Data Streams Connector Metrics and Application Metrics.

The following graph shows the metrics for the example application in this post. During checkpointing, the maximum millisBehindLatest metric occasionally spikes up to 7 seconds. However, because the reported average of the metric is less than 1 second and the application immediately catches up to the tip of the stream again, it is not a concern for this architecture.

Although the lag of the application is one of the most important metrics to monitor, there are other relevant metrics that Apache Flink and Kinesis Data Analytics expose. For more information, see Monitoring Apache Flink Applications 101 on the Apache Flink website.

Monitoring sinks

To verify that sinks are receiving data and, depending on the sink type, do not run out of storage, you need to monitor sinks closely.

You can enable detailed metrics for your S3 buckets that track the number of requests and data uploaded into the bucket with 1-minute granularity. For more information, see Monitoring Metrics with Amazon CloudWatch. The following graph shows these metrics for the S3 bucket of the example architecture:

When the architecture persists data into a Kinesis data stream or a Kafka topic, it acts as a producer, so the same recommendations as for monitoring and scaling sources apply. For more information about operating and monitoring the service in production environments, see Amazon Elasticsearch Service Best Practices.

Handling errors

“Failures are a given and everything eventually fails over time”, so you should expect the application to fail at some point. For example, an underlying node of the infrastructure that Kinesis Data Analytics manages might fail, or intermittent timeouts on the network can prevent the application from reading from sources or writing to sinks. When this happens, Kinesis Data Analytics restarts the application and resumes processing by recovering from the latest checkpoint. Because the raw events have been persisted in a data stream or Kafka topic, the application can reread the events that have been persisted in the stream between the last checkpoint and when it recovered and continue standard processing.

These kinds of failures are rare and the application can gracefully recover without sacrificing processing semantics, including exactly-once semantics. However, other failure modes need additional attention and mitigation.

When an exception is thrown anywhere in the application code, for example, in the component that contains the logic for parsing events, the entire application crashes. As before, the application eventually recovers, but if the exception is from a bug in your code that a specific event always hits, it results in an infinite loop. After recovering from the failure, the application rereads the event, because it was not processed successfully before, and crashes again. The process starts again and repeats indefinitely, which effectively blocks the application from making any progress.

Therefore, you want to catch and handle exceptions in the application code to avoid crashing the application. If there is a persistent problem that you cannot resolve programmatically, you can use side outputs to redirect the problematic raw events to a secondary data stream, which you can persist to a dead letter queue or an S3 bucket for later inspection. For more information, see Side Outputs on the Apache Flink website.

When the application is stuck and cannot make any progress, it is at least visible in the metrics for application lag. If your streaming ETL pipeline filters or enriches events, failures may be much more subtle, and you may only notice them long after they have been ingested. For instance, due to a bug in the application, you may accidentally drop important events or corrupt their payload in unintended ways. Kinesis data streams stores events for up to 7 days and, though technically possible, Apache Kafka is often not configured to store events indefinitely either. If you don’t identify the corruption quickly enough, you risk losing information when the retention of the raw events expires.

To protect against this scenario, you can persist the raw events to Amazon S3 before you apply any additional transformations or processing to them. You can keep the raw events and reprocess or replay them into the stream if you need to. To integrate the functionality into the application, add a second sink that just writes to Amazon S3. Alternatively, use a separate application that only reads and persists the raw events from the stream, at the cost of running and paying for an additional application.

When to choose what

AWS provides many services that work with streaming data and can perform streaming ETL. Amazon Kinesis Data Firehose can ingest, process, and persist streaming data into a range of supported destinations. There is a significant overlap of the functionality between Kinesis Data Firehose and the solution in this post, but there are different reasons to use one or the other.

As a rule of thumb, use Kinesis Data Firehose whenever it fits your requirements. The service is built with simplicity and ease of use in mind. To use Kinesis Data Firehose, you just need to configure the service. You can use Kinesis Data Firehose for streaming ETL use cases with no code, no servers, and no ongoing administration. Moreover, Kinesis Data Firehose comes with many built-in capabilities, and its pricing model allows you to only pay for the data processed and delivered. If you don’t ingest data into Kinesis Data Firehose, you pay nothing for the service.

In contrast, the solution in this post requires you to create, build, and deploy a Flink application. Moreover, you need to think about monitoring and how to obtain a robust architecture that is not only tolerant against infrastructure failures but also resilient against application failures and bugs. However, this added complexity unlocks many advanced capabilities, which your use case may require. For more information, see Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications and the Amazon Kinesis Data Analytics Developer Guide.

What’s next?

This post discussed how to build a streaming ETL pipeline with Apache Flink and Kinesis Data Analytics. It focused on how to build an extensible solution that addresses some advanced use cases for streaming ingest while maintaining low operational overhead. The solution allows you to quickly enrich, transform, and load your streaming data into your data lake, data store, or another analytical tool without the need for an additional ETL step. The post also explored ways to extend the application with monitoring and error handling.

You should now have a good understanding of how to build streaming ETL pipelines on AWS. You can start capitalizing on your time-sensitive events by using a streaming ETL pipeline that makes valuable information quickly accessible to consumers. You can tailor the format and shape of this information to your use case without adding the substantial latency of traditional batch-based ETL processes.

 


About the Author

Steffen Hausmann is a Specialist Solutions Architect for Analytics at AWS. He works with customers around the globe to design and build streaming architectures so that they can get value from analyzing their streaming data. He holds a doctorate degree in computer science from the University of Munich and in his free time, he tries to lure his daughters into tech with cute stickers he collects at conferences. You can follow his ruthless attempts on Twitter (@sthmmm).

 

 

Application analytics pipeline with Amazon EventBridge

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/application-analytics-pipeline-with-amazon-eventbridge/

This post is courtesy of Rajdeep Tarat, Solutions Architect and Venugopal Pai, Solutions Architect

Customers across industry verticals collect, analyze, and derive insights from end-user application analytics using solutions such as Google Analytics and MixPanel. While these solutions provide built-in dashboards for marketing analytics, it can be difficult to reuse the raw event data.

Setting up a pipeline to move the raw event data into AWS opens up possibilities for various rule-based, statistical, and machine learning algorithms to derive deep insights about end-user behavior. Additionally, the raw event data can be enriched with other transactional data points available within the customer’s AWS environment.

This post uses the Segment Partner integration in Amazon EventBridge to pipe the data into your AWS environment. Segment allows you to collect, unify, and connect end-user application analytics into AWS using Amazon EventBridge as a destination.

Segment already supports direct, optimized connections to many AWS services such as Amazon Redshift, Amazon Personalize, Amazon Kinesis, Amazon Kinesis Data Firehose, AWS Lambda, and Amazon S3. The EventBridge destination is a good choice for customers who want the flexibility and centralization that EventBridge offers.

EventBridge makes it easy to build scalable event-driven application pipelines by handling event ingestion, delivery, security, authorization, and error handling for you. The architecture of this pipeline is shown below:Segment architecture

In the diagram, end-user applications send the data into Segment, which is routed to each of the configured destinations (for example, EventBridge). Once the data reaches EventBridge, it is again routed to multiple targets. With this approach you can continue using existing solutions supported by Segment in parallel to the Amazon EventBridge destination.

This architecture makes the pipeline highly extensible and modular. Firstly, you can configure multiple Segment destinations to fan out the event data into other existing solutions in parallel to EventBridge. Marketing teams can continue to use their existing tools without any disruptions while the data is seamlessly pumped into AWS. Within the AWS Cloud, EventBridge can again route the event data to up to five targets per rule.

The following section provides a walkthrough of setting up the Segment integration with EventBridge, and configuring two targets within the AWS Cloud.

  • The first target uses an Amazon Kinesis Data Firehose to deliver the data to an S3 bucket. From the S3 bucket, multiple AWS services can use the data (learn more about using S3 as a data lake).
  • The second target posts the event data to an SNS topic. From here, the data can be consumed by subscribers for the topic.

Walkthrough

To set up the pipeline, you must configure the Segment partner integration in EventBridge, and then set up the targets where analytics data is sent.

Amazon EventBridge – Segment partner integration:

  1. From the Amazon EventBridge console, navigate to the Partner Event Sources > Segment setup page. Copy your AWS Account ID from here.
    Segment setup
  2. On the Segment destination setup page, use the Amazon EventBridge integration. Enter the AWS Account ID and select a Region (learn more about setting up a Segment destination).
    EventBridge settings

Create the event bus:

  1. After linking the Segment Destination with the AWS Account ID, send a test event from Segment to create a Partner Event Source. This also creates an Event bus with the same name. This is done by firing a test event from the Event Tester in the Segment Dashboard.
    Event Tester
  2. After the first test event is fired, the Partner Event Source and the corresponding event bus is created in the EventBridge console.
    Partner event sources

Create rules:

  1. A rule watches for incoming events and routes them to specific targets that are configured. Start by creating a new rule and entering a name.
    Creating rules
  2. For Event Pattern, select the Predefined pattern by Service, and select Service Partners > Segment.
    Define event pattern
  3. Under Select event bus, select the Custom or partner event bus, and the name of the event bus created.
    Select event bus

Configuring multiple targets for the event bus:

  1. For Kinesis streams, select Kinesis stream from the Target dropdown, and the name of the stream. For more details on creating a Kinesis data stream, read this documentation.
    Select targets
  2. For SNS topic, choose Add Target and repeat the same steps to add an SNS topic instead. For more details on creating an SNS topic, read this documentation.
    SNS as target
  3. You can optionally tag the resource, then choose Create.

The pipeline is ready to send data to the targets configured via the event bus. You can now send test events from the Segment dashboard and monitor Kinesis Data Firehose or by setting up subscribers for the SNS topic.

Conclusion

This post shows how customers can capture end-user application analytics using the partner solution Segment in real time, and ingest data into Amazon EventBridge. The data routing is made extensible using multiple Segment destinations (for third-party solutions), and using multiple rules in EventBridge (for multiple destinations within the AWS Cloud).

To learn more about Amazon EventBridge integrations, read the EventBridge documentation.

How FactSet automated exporting data from Amazon DynamoDB to Amazon S3 Parquet to build a data analytics platform

Post Syndicated from Arvind Godbole original https://aws.amazon.com/blogs/big-data/how-factset-automated-exporting-data-from-amazon-dynamodb-to-amazon-s3-parquet-to-build-a-data-analytics-platform/

This is a guest post by Arvind Godbole, Lead Software Engineer with FactSet and Tarik Makota, AWS Principal Solutions Architect. In their own words “FactSet creates flexible, open data and software solutions for tens of thousands of investment professionals around the world, which provides instant access to financial data and analytics that investors use to make crucial decisions. At FactSet, we are always working to improve the value that our products provide.”

One area that we’ve been looking into is the relevancy of search results for our clients. Given the wide variety of client use cases and the large number of searches per day, we needed a platform to store anonymized usage data and allow us to analyze that data to boost results using our custom scoring algorithm. Amazon EMR was the obvious choice to host the calculations, but the question arose on how to get our anonymized data into a form that Amazon EMR could use. We worked with AWS and chose to use Amazon DynamoDB to prepare the data for usage in Amazon EMR.

This post walks you through how FactSet takes data from a DynamoDB table and converts that data into Apache Parquet. We store the Parquet files in Amazon S3 to enable near real-time analysis with Amazon EMR. Along the way, we encountered challenges related to data type conversion, which we will explain and show how we were able to overcome these.

Workflow overview

Our workflow contained the following steps:

  1. Anonymized log data is stored into DynamoDB tables. These entries have different fields, depending on how the logs were generated. Whenever we create items in the tables, we use DynamoDB Streams to write out a record. The stream records contain information from a single item in a DynamoDB table.
  2. An AWS Lambda function is hooked into the DynamoDB stream to capture the new items stored in a DynamoDB table. We built our Lambda function off of the lambda-streams-to-firehose project on GitHub to convert the DynamoDB stream image to JSON, which we stringify and push to Amazon Kinesis Data Firehose.
  3. Kinesis Data Firehose transforms the JSON data into Parquet using data contained within an AWS Glue Data Catalog table.
  4. Kinesis Data Firehose stores the Parquet files in S3.
  5. An AWS Glue crawler discovers the schema of DynamoDB items and stores the associated metadata into the Data Catalog.

The following diagram illustrates this workflow.

AWS Glue provides tools to help with data preparation and analysis. A crawler can run on a DynamoDB table to take inventory of the table data and store that information in a Data Catalog. Other services can use the Data Catalog as an index to the location, schema, and types of the table data. There are other ways to add metadata into a Data Catalog, but the key idea is that you can update and modify the metadata easily. For more information, see Populating the AWS Glue Data Catalog.

Problem: Data type disparities

Using a variety of technologies to build a solution often requires mapping and converting data types between these technologies. The cloud is no exception. In our case, log items stored in DynamoDB contained attributes of type String Set. String Set values caused data conversion exceptions when Kinesis tried to transform the data to Parquet. After investigating the problem, we found the following:

  • As the crawler indexes the DynamoDB table, Set data types (StringSet, NumberSet) are stored in the Glue metadata catalog as set<string> and set<bigint>.
  • Kinesis Data Firehose uses that same catalog when it performs the conversion to Apache Parquet. The conversion requires valid Hive data types.
  • set<string> and set<bigint> are not valid Hive data types, so the conversion fails, and an exception is generated. The exception looks similar to the following code:
    [{
       "lastErrorCode": "DataFormatConversion.InvalidSchema",
       "lastErrorMessage": "The schema is invalid. Error parsing the schema: Error: type expected at the position 38 of 'array,used:bigint>>' but 'set' is found."
    }]

Solution: Construct data mapping

While working with the AWS team, we confirmed that the Kinesis Data Firehose converter needs valid Hive data types in the Data Catalog to succeed. When it comes to complex data types, Hive doesn’t support set<data_type>, but it does support the following:

  • ARRAY<data_type>
  • MAP<primitive_type, data_type
  • STRUCT<col_name : data_type [COMMENT col_comment], ...>
  • UNIONTYPE<data_type, data_type, ...>

In our case, this meant that we must convert set<string> and set<bigint> into array<string> and array<bigint>. Our first step was to manually change the types directly in the Data Catalog. After we updated the Data Catalog to change all occurrences of set<data_type> to array<data_type>, the Kinesis transformation to Parquet completed successfully.

Our business case calls for a data store that can store items with different attributes in the same table and the addition of new attributes on-the-fly. We took advantage of DynamoDB’s schema-less nature and ability to scale up and down on-demand so we could focus on our functionality and not the management of the underlying infrastructure. For more information, see Should Your DynamoDB Table Be Normalized or Denormalized?

If our data had a static schema, a manual change would be good enough. Given our business case, a manual solution wasn’t going to scale. Every time we introduced new attributes to the DynamoDB table, we needed to run the crawler, which re-created the metadata and overwrote the change.

Serverless event architecture

To automate the data type updates to the Data Catalog, we used Amazon EventBridge and Lambda to implement the modifications to the data type mapping. EventBridge is a serverless event bus that connects applications using events. An event is a signal that a system’s state has changed, such as the status of a Data Catalog table.

The following diagram shows the previous workflow with the new architecture.

  1. The crawler stays as-is and crawls the DynamoDB table to obtain the metadata.
  2. The metadata obtained by the crawler is stored in the Data Catalog. Previous metadata is updated or removed, and changes (manual or automated) are overwritten.
  3. The event GlueTableChanged in EventBridge listens to any changes to the Data Catalog tables. After we receive the event that there was a change to the table, we trigger the Lambda function.
  4. The Lambda function uses AWS SDK to update the Glue Catalog table using the glue.update_table() API to replace occurrences of set<data_type> with array<data_type>.

To set up EventBridge, we set Event pattern to be “Pre-defined pattern by service”. For service provider, we selected AWS and Glue as service. Event Type we selected “Glue Data Catalog Table State Change”. The following screenshot shows the EventBridge configuration that sends events to the Lambda function that updates the Data Catalog.

The following is the baseline Lambda code:

# This is NOT production worthy code please modify and implement error handling routines as appropriate
import json
import logging
import boto3

glue = boto3.client('glue')

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Define subsegments manually
def table_contains_set(databaseName, tableName):
    
    # returns Glue Catalog description for Table structure
    response = glue.get_table( DatabaseName=databaseName,Name=tableName)
    logger.info(response)  
    
    # loop thru all the Columns of the table 
    isModified = False
    for i in response['Table']['StorageDescriptor']['Columns']: 
        logger.info("## Column: " + str(i['Name']))
        # if Column datatype starts with set< then change it to array<
        if i['Type'].find("set<") != -1:
            i['Type'] = i['Type'].replace("set<", "array<")
            isModified = True
            logger.info(i['Type'])
    
    if isModified:
        # following 3 statements simply clean up the response JSON so that update_table API call works
        del response['Table']['DatabaseName']
        del response['Table']['CreateTime']
        del response['Table']['UpdateTime']
        glue.update_table(DatabaseName=databaseName,TableInput=response['Table'],SkipArchive=True)
        
    logger.info("============ ### =============") 
    logger.info(response)
    
    return True
    
def lambda_handler(event, context):
    logger.info('## EVENT')
    # logger.info(event)
    # This is Sample of the event payload that would be received
    # { 'version': '0', 
    #   'id': '2b402842-21f5-1d76-1a9a-c90076d1d7da', 
    #   'detail-type': 'Glue Data Catalog Table State Change', 
    #   'source': 'aws.glue', 
    #   'account': '1111111111', 
    #   'time': '2019-08-18T02:53:41Z', 
    #   'region': 'us-east-1', 
    #   'resources': ['arn:aws:glue:us-east-1:111111111:table/ddb-glue-fh/ddb_glu_fh_sample'], 
    #   'detail': {
    #           'databaseName': 'ddb-glue-fh', 
    #           'changedPartitions': [], 
    #           'typeOfChange': 'UpdateTable', 
    #           'tableName': 'ddb_glu_fh_sample'
    #    }
    # }
    
    # get the database and table name of the Glue table triggered the event
    databaseName = event['detail']['databaseName']
    tableName = event['detail']['tableName']
    logger.info("DB: " + databaseName + " | Table: " + tableName)
    
    table_contains_set(databaseName, tableName)
   
    # TODO implement and modify
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

The Lambda function is straightforward; this post provides a basic skeleton. You can use this as a template to implement your own functionality for your specific data.

Conclusion

Simple things such as data type conversion and mapping can create unexpected outcomes and challenges when data crosses service boundaries. One of the advantages of AWS is the wide variety of tools with which you can create robust and scalable solutions tailored to your needs. Using event-driven architecture, we solved our data type conversion errors and automated the process to eliminate the issue as we move forward.

 


About the Authors

Arvind Godbole is a Lead Software Engineer at FactSet Research Systems. He has experience in building high-performance, high-availability client facing products and services, ranging from real-time financial applications to search infrastructure. He is currently building an analytics platform to gain insights into client workflows. He holds a B.S. in Computer Engineering from the University of California, San Diego

 

 

 

Tarik Makota is a Principal Solutions Architect with the Amazon Web Services. He provides technical guidance, design advice and thought leadership to AWS’ customers across US Northeast. He holds an M.S. in Software Development and Management from Rochester Institute of Technology.

 

 

ICYMI: Serverless Q4 2019

Post Syndicated from Rob Sutter original https://aws.amazon.com/blogs/compute/icymi-serverless-q4-2019/

Welcome to the eighth edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

In case you missed our last ICYMI, checkout what happened last quarter here.

The three months comprising the fourth quarter of 2019

AWS re:Invent

AWS re:Invent 2019

re:Invent 2019 dominated the fourth quarter at AWS. The serverless team presented a number of talks, workshops, and builder sessions to help customers increase their skills and deliver value more rapidly to their own customers.

Serverless talks from re:Invent 2019

Chris Munns presenting 'Building microservices with AWS Lambda' at re:Invent 2019

We presented dozens of sessions showing how customers can improve their architecture and agility with serverless. Here are some of the most popular.

Videos

Decks

You can also find decks for many of the serverless presentations and other re:Invent presentations on our AWS Events Content.

AWS Lambda

For developers needing greater control over performance of their serverless applications at any scale, AWS Lambda announced Provisioned Concurrency at re:Invent. This feature enables Lambda functions to execute with consistent start-up latency making them ideal for building latency sensitive applications.

As shown in the below graph, provisioned concurrency reduces tail latency, directly impacting response times and providing a more responsive end user experience.

Graph showing performance enhancements with AWS Lambda Provisioned Concurrency

Lambda rolled out enhanced VPC networking to 14 additional Regions around the world. This change brings dramatic improvements to startup performance for Lambda functions running in VPCs due to more efficient usage of elastic network interfaces.

Illustration of AWS Lambda VPC to VPC NAT

New VPC to VPC NAT for Lambda functions

Lambda now supports three additional runtimes: Node.js 12, Java 11, and Python 3.8. Each of these new runtimes has new version-specific features and benefits, which are covered in the linked release posts. Like the Node.js 10 runtime, these new runtimes are all based on an Amazon Linux 2 execution environment.

Lambda released a number of controls for both stream and async-based invocations:

  • You can now configure error handling for Lambda functions consuming events from Amazon Kinesis Data Streams or Amazon DynamoDB Streams. It’s now possible to limit the retry count, limit the age of records being retried, configure a failure destination, or split a batch to isolate a problem record. These capabilities help you deal with potential “poison pill” records that would previously cause streams to pause in processing.
  • For asynchronous Lambda invocations, you can now set the maximum event age and retry attempts on the event. If either configured condition is met, the event can be routed to a dead letter queue (DLQ), Lambda destination, or it can be discarded.

AWS Lambda Destinations is a new feature that allows developers to designate an asynchronous target for Lambda function invocation results. You can set separate destinations for success and failure. This unlocks new patterns for distributed event-based applications and can replace custom code previously used to manage routing results.

Illustration depicting AWS Lambda Destinations with success and failure configurations

Lambda Destinations

Lambda also now supports setting a Parallelization Factor, which allows you to set multiple Lambda invocations per shard for Kinesis Data Streams and DynamoDB Streams. This enables faster processing without the need to increase your shard count, while still guaranteeing the order of records processed.

Illustration of multiple AWS Lambda invocations per Kinesis Data Streams shard

Lambda Parallelization Factor diagram

Lambda introduced Amazon SQS FIFO queues as an event source. “First in, first out” (FIFO) queues guarantee the order of record processing, unlike standard queues. FIFO queues support messaging batching via a MessageGroupID attribute that supports parallel Lambda consumers of a single FIFO queue, enabling high throughput of record processing by Lambda.

Lambda now supports Environment Variables in the AWS China (Beijing) Region and the AWS China (Ningxia) Region.

You can now view percentile statistics for the duration metric of your Lambda functions. Percentile statistics show the relative standing of a value in a dataset, and are useful when applied to metrics that exhibit large variances. They can help you understand the distribution of a metric, discover outliers, and find hard-to-spot situations that affect customer experience for a subset of your users.

Amazon API Gateway

Screen capture of creating an Amazon API Gateway HTTP API in the AWS Management Console

Amazon API Gateway announced the preview of HTTP APIs. In addition to significant performance improvements, most customers see an average cost savings of 70% when compared with API Gateway REST APIs. With HTTP APIs, you can create an API in four simple steps. Once the API is created, additional configuration for CORS and JWT authorizers can be added.

AWS SAM CLI

Screen capture of the new 'sam deploy' process in a terminal window

The AWS SAM CLI team simplified the bucket management and deployment process in the SAM CLI. You no longer need to manage a bucket for deployment artifacts – SAM CLI handles this for you. The deployment process has also been streamlined from multiple flagged commands to a single command, sam deploy.

AWS Step Functions

One powerful feature of AWS Step Functions is its ability to integrate directly with AWS services without you needing to write complicated application code. In Q4, Step Functions expanded its integration with Amazon SageMaker to simplify machine learning workflows. Step Functions also added a new integration with Amazon EMR, making EMR big data processing workflows faster to build and easier to monitor.

Screen capture of an AWS Step Functions step with Amazon EMR

Step Functions step with EMR

Step Functions now provides the ability to track state transition usage by integrating with AWS Budgets, allowing you to monitor trends and react to usage on your AWS account.

You can now view CloudWatch Metrics for Step Functions at a one-minute frequency. This makes it easier to set up detailed monitoring for your workflows. You can use one-minute metrics to set up CloudWatch Alarms based on your Step Functions API usage, Lambda functions, service integrations, and execution details.

Step Functions now supports higher throughput workflows, making it easier to coordinate applications with high event rates. This increases the limits to 1,500 state transitions per second and a default start rate of 300 state machine executions per second in US East (N. Virginia), US West (Oregon), and Europe (Ireland). Click the above link to learn more about the limit increases in other Regions.

Screen capture of choosing Express Workflows in the AWS Management Console

Step Functions released AWS Step Functions Express Workflows. With the ability to support event rates greater than 100,000 per second, this feature is designed for high-performance workloads at a reduced cost.

Amazon EventBridge

Illustration of the Amazon EventBridge schema registry and discovery service

Amazon EventBridge announced the preview of the Amazon EventBridge schema registry and discovery service. This service allows developers to automate discovery and cataloging event schemas for use in their applications. Additionally, once a schema is stored in the registry, you can generate and download a code binding that represents the schema as an object in your code.

Amazon SNS

Amazon SNS now supports the use of dead letter queues (DLQ) to help capture unhandled events. By enabling a DLQ, you can catch events that are not processed and re-submit them or analyze to locate processing issues.

Amazon CloudWatch

Amazon CloudWatch announced Amazon CloudWatch ServiceLens to provide a “single pane of glass” to observe health, performance, and availability of your application.

Screenshot of Amazon CloudWatch ServiceLens in the AWS Management Console

CloudWatch ServiceLens

CloudWatch also announced a preview of a capability called Synthetics. CloudWatch Synthetics allows you to test your application endpoints and URLs using configurable scripts that mimic what a real customer would do. This enables the outside-in view of your customers’ experiences, and your service’s availability from their point of view.

CloudWatch introduced Embedded Metric Format, which helps you ingest complex high-cardinality application data as logs and easily generate actionable metrics. You can publish these metrics from your Lambda function by using the PutLogEvents API or using an open source library for Node.js or Python applications.

Finally, CloudWatch announced a preview of Contributor Insights, a capability to identify who or what is impacting your system or application performance by identifying outliers or patterns in log data.

AWS X-Ray

AWS X-Ray announced trace maps, which enable you to map the end-to-end path of a single request. Identifiers show issues and how they affect other services in the request’s path. These can help you to identify and isolate service points that are causing degradation or failures.

X-Ray also announced support for Amazon CloudWatch Synthetics, currently in preview. CloudWatch Synthetics on X-Ray support tracing canary scripts throughout the application, providing metrics on performance or application issues.

Screen capture of AWS X-Ray Service map in the AWS Management Console

X-Ray Service map with CloudWatch Synthetics

Amazon DynamoDB

Amazon DynamoDB announced support for customer-managed customer master keys (CMKs) to encrypt data in DynamoDB. This allows customers to bring your own key (BYOK) giving you full control over how you encrypt and manage the security of your DynamoDB data.

It is now possible to add global replicas to existing DynamoDB tables to provide enhanced availability across the globe.

Another new DynamoDB capability to identify frequently accessed keys and database traffic trends is currently in preview. With this, you can now more easily identify “hot keys” and understand usage of your DynamoDB tables.

Screen capture of Amazon CloudWatch Contributor Insights for DynamoDB in the AWS Management Console

CloudWatch Contributor Insights for DynamoDB

DynamoDB also released adaptive capacity. Adaptive capacity helps you handle imbalanced workloads by automatically isolating frequently accessed items and shifting data across partitions to rebalance them. This helps reduce cost by enabling you to provision throughput for a more balanced workload instead of over provisioning for uneven data access patterns.

Amazon RDS

Amazon Relational Database Services (RDS) announced a preview of Amazon RDS Proxy to help developers manage RDS connection strings for serverless applications.

Illustration of Amazon RDS Proxy

The RDS Proxy maintains a pool of established connections to your RDS database instances. This pool enables you to support a large number of application connections so your application can scale without compromising performance. It also increases security by enabling IAM authentication for database access and enabling you to centrally manage database credentials using AWS Secrets Manager.

AWS Serverless Application Repository

The AWS Serverless Application Repository (SAR) now offers Verified Author badges. These badges enable consumers to quickly and reliably know who you are. The badge appears next to your name in the SAR and links to your GitHub profile.

Screen capture of SAR Verifiedl developer badge in the AWS Management Console

SAR Verified developer badges

AWS Developer Tools

AWS CodeCommit launched the ability for you to enforce rule workflows for pull requests, making it easier to ensure that code has pass through specific rule requirements. You can now create an approval rule specifically for a pull request, or create approval rule templates to be applied to all future pull requests in a repository.

AWS CodeBuild added beta support for test reporting. With test reporting, you can now view the detailed results, trends, and history for tests executed on CodeBuild for any framework that supports the JUnit XML or Cucumber JSON test format.

Screen capture of AWS CodeBuild

CodeBuild test trends in the AWS Management Console

Amazon CodeGuru

AWS announced a preview of Amazon CodeGuru at re:Invent 2019. CodeGuru is a machine learning based service that makes code reviews more effective and aids developers in writing code that is more secure, performant, and consistent.

AWS Amplify and AWS AppSync

AWS Amplify added iOS and Android as supported platforms. Now developers can build iOS and Android applications using the Amplify Framework with the same category-based programming model that they use for JavaScript apps.

Screen capture of 'amplify init' for an iOS application in a terminal window

The Amplify team has also improved offline data access and synchronization by announcing Amplify DataStore. Developers can now create applications that allow users to continue to access and modify data, without an internet connection. Upon connection, the data synchronizes transparently with the cloud.

For a summary of Amplify and AppSync announcements before re:Invent, read: “A round up of the recent pre-re:Invent 2019 AWS Amplify Launches”.

Illustration of AWS AppSync integrations with other AWS services

Q4 serverless content

Blog posts

October

November

December

Tech talks

We hold several AWS Online Tech Talks covering serverless tech talks throughout the year. These are listed in the Serverless section of the AWS Online Tech Talks page.

Here are the ones from Q4:

Twitch

October

There are also a number of other helpful video series covering Serverless available on the AWS Twitch Channel.

AWS Serverless Heroes

We are excited to welcome some new AWS Serverless Heroes to help grow the serverless community. We look forward to some amazing content to help you with your serverless journey.

AWS Serverless Application Repository (SAR) Apps

In this edition of ICYMI, we are introducing a section devoted to SAR apps written by the AWS Serverless Developer Advocacy team. You can run these applications and review their source code to learn more about serverless and to see examples of suggested practices.

Still looking for more?

The Serverless landing page has much more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials. We’re also kicking off a fresh series of Tech Talks in 2020 with new content providing greater detail on everything new coming out of AWS for serverless application developers.

Throughout 2020, the AWS Serverless Developer Advocates are crossing the globe to tell you more about serverless, and to hear more about what you need. Follow this blog to keep up on new launches and announcements, best practices, and examples of serverless applications in action.

You can also follow all of us on Twitter to see latest news, follow conversations, and interact with the team.

Chris Munns: @chrismunns
Eric Johnson: @edjgeek
James Beswick: @jbesw
Moheeb Zara: @virgilvox
Ben Smith: @benjamin_l_s
Rob Sutter: @rts_rob
Julian Wood: @julian_wood

Happy coding!

Under the hood: Scaling your Kinesis data streams

Post Syndicated from Ahmed Gaafar original https://aws.amazon.com/blogs/big-data/under-the-hood-scaling-your-kinesis-data-streams/

Real-time delivery of data and insights enables businesses to pivot quickly in response to changes in demand, user engagement, and infrastructure events, among many others. Amazon Kinesis offers a managed service that lets you focus on building your applications, rather than managing infrastructure. Scalability is provided out-of-the-box, allowing you to ingest and process gigabytes of streaming data per second. Data replication to three Availability Zones offers high availability and durability. Pricing is based on usage and requires no upfront costs, making Kinesis a cost-effective solution.

Amazon Kinesis Data Streams use a provisioned capacity model. Each data stream is composed of one or more shards that act as units of capacity. Shards make it easy for you to design and scale a streaming pipeline by providing a predefined write and read capacity. As workloads grow, an application may read or write to a shard at a rate that exceeds its capacity, creating a hot shard and requiring you to add capacity quickly. Shards also enable you to parallelize the processing of large datasets and compute results quickly.

This post discusses how to scale your data streams and avoid hot shards. The post first shows you how to estimate the number of shards you need in your data stream as you design your streaming pipeline. Then it looks at reasons that lead to hot shards and how to avoid those using Kinesis Data Streams scaling mechanisms and reviews important metrics for monitoring.

Estimating your stream capacity

The following diagram shows a streaming data pipeline connected to a multiplayer video game. Kinesis Data Streams ingest player scores and other stats. You can filter and enrich the data, and write it to DynamoDB tables that populate the game’s various leaderboards.

As you embark on designing your streaming pipeline, it’s important to set up the data stream with enough capacity to handle producers ingesting the data records producers create, and handle users consuming the same records. You can ingest up to 1 MB per second per shard or 1,000 data records per second per shard for writes. Read capacity is up to 2 MB per second per shard or five read transactions per second. All applications reading from the stream share the read capacity. You can use the enhanced fan-out feature to scale the number of consuming applications and make sure that each has a dedicated 2 MB per second connection.

This post uses the preceding application as an example. It’s estimated that producers create data records at a rate of 20,000 KB per second, and your consumer nodes need to process this same amount of data at the other end of the stream. In addition to handling these rates, it is a good idea to add extra capacity to give the stream headroom for growth.

This headroom also helps your application recover faster in scenarios that could cause a delay or a pause in ingesting or processing data. These scenarios may include:

  • Deploying a new version of a consumer application
  • Transient network issues

As these nodes work to catch up after recovery, they produce or consume records at a higher than standard rate, requiring higher capacity. For this example, you can add 25% percent, or five shards, for headroom. Shards are cost-efficient, but it is up to you how many you want to add.

Scaling scenario

At the time of the game’s release, this capacity is deemed sufficient for the application. Ingestion and processing of the data are both running smoothly, and the game’s leaderboards are populated with current data. It’s now a few weeks after release; the game is steadily gaining popularity and concurrent player numbers are increasing. In a scenario such as this, it’s important to have sufficient monitoring to detect the increased load so you can increase throughput by scaling the stream.

The following diagram provides a simplified view of using CloudWatch metrics to monitor your data streams and triggering scaling operations

In this example, these scaling issues could manifest in delayed leaderboard update reports. Because shards are the capacity units in a data stream, each shard’s capacity is independent of other shards. If the producers write to a single shard at a rate higher than 1 MB per second or 1,000 records per second, that shard becomes a hot shard and requests exceeding that capacity get throttled, leading to a delay in leaderboard updates. This condition can happen while other shards in the stream are underutilized, so if you are monitoring metrics at the stream level, you may not see any cause for concern, because the stream overall is ingesting data at a rate below its total capacity of 25 MB per second. Amazon Kinesis enables you to seamlessly scale your stream without interrupting your streaming pipeline.

Core concepts that enable scaling

You can write records to a data stream using Put APIs. To write a single record, use PutRecord; to write multiple records, use PutRecords. When executing either one, the request to the Kinesis API has to include the following three components:

  • The stream name.
  • The data record to write to the stream. For this post, this is the scoring result of a particular round in the game.
  • A partition key (for example, the fame session).

The following diagram shows multiple producers writing to a Kinesis data stream. The partition key value is used in combination with a hash function to determine the shard a given record will be written to.

The partition key determines to which shard the record is written. The partition key is a Unicode string with a maximum length of 256 bytes. Kinesis runs the partition key value that you provide in the request through an MD5 hash function. The resulting value maps your record to a specific shard within the stream, and Kinesis writes the record to that shard. Partition keys dictate how to distribute data across the stream and use shards.

Certain use cases require you to partition data based on specific criteria for efficient processing by the consuming applications. As an example, if you use player ID pk1234 as the hash key, all scores related to that player route to shard1. The consuming application can use the fact that data stored in shard1 has an affinity with the player ID and can efficiently calculate the leaderboard. An increase in traffic related to players mapped to shard1 can lead to a hot shard. Kinesis Data Streams allows you to handle such scenarios by splitting or merging shards without disrupting your streaming pipeline.

If your use cases do not require data stored in a shard to have high affinity, you can achieve high overall throughput by using a random partition key to distribute data. Random partition keys help distribute the incoming data records evenly across all the shards in the stream and reduce the likelihood of one or more shards getting hit with a disproportionate number of records. You can use a universally unique identifier (UUID) as a partition key to achieve this uniform distribution of records across shards. This strategy can increase the latency of record processing if the consumer application has to aggregate data from multiple shards.

Kinesis Data Streams scaling mechanisms

The stream remains fully functional during these actions. Producers and consumers can continue to read and write to the stream during the scaling process.

Upon receiving the scaling request, Kinesis sets the stream status to Updating. You can use the DescribeStreams API to check the stream status. When the operation is complete, the stream status shows as Active.

The SplitShard  action splits one active shard into two shards, increasing the read and write capacity of the stream. This can be helpful if there is an expected increase in the number of records ingested into the stream or so more Kinesis Data Streams applications can simultaneously read data from the stream for real-time processing.

SplitShard facilitates that process. The hash key space of the parent shard also splits. The two new shards accept new data records and the parent shard stops accepting new records. Existing records in the parent shard are retained for the duration of the stream retention period (the default is 24 hours, configurable up to 7 days). You must specify the new-starting-hash-key value when issuing this command. This value determines the point of the split within the parent shard’s hash key space. In most cases, you want to do an even split. However, you might need to do an uneven split if you have unbalanced shards that you want to rebalance, for example. The following diagram shows the SplitShard process in action.

Many streaming workloads have variable data flow rates that fluctuate over time, sometimes following daily, weekly, or seasonal patterns. As you monitor your data flow rates, you may see underutilized shards that, if merged, still have a data flow rate below the shard limits, yet reduce the cost of the stream.

The MergeShards action merges two adjacent shards, producing one shard. Two shards are considered adjacent if the hash key spaces for the two form a contiguous set with no gaps. When the two shards merge, their hash key spaces also merge. The new shard starts accepting new data records. The two parent shards stop accepting new records and retain existing records up to the stream’s configured retention period. The following diagram shows the MergeShards process in action.

The UpdateShardCount action is useful when you need to scale your stream, up or down, to a specific number of shards, and you provide that number as a parameter in the API call. Scaling in increments of 25% of the current capacity (25%, 50%, 75%, 100%) helps the operation complete faster, but is not required. The command executes a series of SplitShard and MergeShards actions as needed to reach the explicit number of shards you specified. This command splits shard hash key space evenly, creating shards of equal size. There is no option to choose a different value.

Additionally, the Kinesis Scaling Utility available on GitHub provides autoscaling for Kinesis Data Streams by monitoring a stream’s Amazon CloudWatch metrics and scaling it up or down accordingly. It can scale a stream by an explicit shard count or as a percentage of the total fleet. There is no requirement for you to manage the allocation of the key space to shards when using this API; it happens automatically.

Balancing shards

After completing a scaling operation, check the distribution of the hash key space in your stream. In most use cases, the hash key space should be evenly distributed across the shards in the stream. Errors in calculating or inputting a shard’s hash key space starting value can lead to creating new shards with an unusually large or small hash key space. Unusually large shards receive a high number of read and write requests, leading to throttling and underutilizing the unusually small shards.

The output of the ListShards API lists the starting and ending hash key value for each shard in the stream. You can use these values to identify unbalanced shards, and perform the necessary splits or merges to balance them. The Kinesis Scaling Utility can also generate a report of shard key space sizes that can help you achieve the same result. See the following code:

{
    "Shards": [
        {
            "ShardId": "shardId-000000000000", 
            "HashKeyRange": {
                "EndingHashKey": "170141183460469231731687303715884105727", 
                "StartingHashKey": "0"
            }, 
            "SequenceNumberRange": {
                "StartingSequenceNumber": "49600965817078608863948980125442188478720910276534730754"
            }
        }, 
        {
            "ShardId": "shardId-000000000001", 
            "HashKeyRange": {
                "EndingHashKey": "340282366920938463463374607431768211455", 
                "StartingHashKey": "170141183460469231731687303715884105728"
            }, 
            "SequenceNumberRange": {
                "StartingSequenceNumber": "49600965817100909609147510748583724196993558638040711186"
            }
        }
    ]
}

Monitoring your streams to preempt hot shards

As the hot shard scenario has demonstrated, monitoring your data streams at only the stream level does not prepare you for issues at the shard level. Kinesis offers a multitude of stream level and shard level metrics. At the shard level, IncomingBytes and IncomingRecords show you the ingestion rate into the shard. WriteProvisionedThroughputExceeded and ReadProvisionedThroughputExceeded indicate throttled Put and Get requests, respectively. At the stream level, keep an eye on PutRecord.Success, in which the average value reflects the percentage of PutRecord success over time. Paired with proper thresholds for alerting, they should help you take scaling actions proactively in response to flow changes in and out of your streams, and reduce the possibility of developing hot shards.

The following image shows a snapshot of a CloudWatch dashboard with several metrics of a Kinesis data stream.

Conclusion

This post discussed how to simplify the scaling and monitoring of your Kinesis Streams. It’s important to spend some time considering the expected data flow rate for your stream to find the appropriate capacity. Choosing a good partition key strategy helps you take full advantage of the capacity you provision and avoid hot shards. Monitoring your stream metrics and setting alarm thresholds helps you gain the visibility you need to make better scaling decisions.

For more information, see What Is Amazon Kinesis Data Streams? For more information about Kinesis API actions, see Actions.

 


About the Author

 Ahmed Gaafar is a senior technical account manager at AWS.

New AWS Lambda controls for stream processing and asynchronous invocations

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/new-aws-lambda-controls-for-stream-processing-and-asynchronous-invocations/

Today AWS Lambda is introducing new controls for asynchronous and stream processing invocations. These new features allow you to customize responses to Lambda function errors and build more resilient event-driven and stream-processing applications.

Stream processing function invocations

When processing data from event sources such as Amazon Kinesis Data Streams, and Amazon DynamoDB Streams, Lambda reads records in batches via shards. A shard is a uniquely identified sequence of data records. Your function is then invoked to process records from the batch “in order.” If an error is returned, Lambda retries the batch until processing succeeds or the data expires. This retry behavior is desirable in many cases, but not all:

  1. Until the issue is resolved, no data in the shard is processed. A single malformed record can prevent processing on an entire shard since “in order” guarantee ensures that failed batches are retried until the data record expires. This is often referred to as a “poison pill” event in that it prevents the rest of the system from processing data.
  2. In some cases, it may not be helpful to retry if subsequent invocations will also fail.
  3. If the function is not idempotent, retrying might produce unexpected side effects, and may result in duplicate outputs (for example, multiple database entries or business transactions).
Stream processing function invocationsFigure 1: Default stream invocation retry logs.

The new Lambda controls for stream processing invocations

With the new customizable controls, users are able to control how function errors and retries impact stream processing function invocations. A new event source-mapping configuration subresource (shown below), allows developers to clearly specify which controls will apply specifically to invocations.

DestinationConfig: {
    OnFailure: {
       Destination: SNS/SQS arn (String)
    }
}
Figure 2: Stream processing destination configuration.
{
    "MaximumRetryAttempts": integer,
    "BisectBatchOnFunctionError" : boolean,
    "MaximumRecordAgeInSeconds" : integer
}
Figure 3: Stream processing failure configuration.

MaximumRetryAttempts

Set a maximum number of retry attempts for batches before they can be skipped to unblock processing and prevent duplicate outputs.

Minimum: 0 | maximum: 10,000 | default: 10,000

MaximumRecordAgeInSeconds

Define a record maximum age in seconds, with expired records skipped to allow processing to continue. Data records that do not get successfully processed within the defined age of submission are skipped

Minimum: 60 | default/maximum: 604,800

BisectBatchOnFunctionError

This gives the option to recursively split the failed batch and retry on a smaller subset of records, eventually isolating the metadata causing the error.

Default: false

On-failure Destination

When either MaximumRetryAttempts or MaximumRecordAgeInSeconds reaches the specified value, a record will be skipped. If ‘Destination’ is set, metadata about the skipped records can be sent to a target ARN (i.e. Amazon SQS or Amazon SNS). If no target is configured, then the record will be dropped.

Getting started with error handling for stream processing invocations

Here you will see how to use the new controls to create a customized error handling configuration for stream processing invocations. You will set the maximum retry attempts to 1, the maximum record age to 60s, and send the metadata of skipped record to an Amazon Simple Queue Service (SQS) queue. First create a Kinesis Stream to put records into, and create an SQS queue to hold the metadata of retry exhausted or expired data records.

  1. Go to the Lambda console and choose Create function.
  2. Ensure that Author from scratch is selected, and enter a function a name such as “customStreamErrorExample.”
  3. Select Runtime as Node.js.12.x.
  4. Choose Create function.
    To enable Kinesis as an event trigger and to send metadata of a failed invocation to the SQS standard queue, you must give the Lambda execution role the required permissions.
  5. Scroll down to the Execution Role section and select the view link below the Existing role drop-down.
    Existing role selection
  6. Choose Add inline policy > JSON, then paste the following into the text box replacing {yourSQSarn} with the ARN of the SQS queue you created earlier and replacing {yourKinesisarn} with the ARN of the stream you created earlier. Choose Review policy.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "sqs:SendMessage",
                    "kinesis:GetShardIterator",
                    "kinesis:GetRecords",
                    "kinesis:DescribeStream"
                ],
                "Resource": [
                    "{yourSQSarn}",
                    "{yourKinesisarn}"
                ]
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": "kinesis:ListStreams",
                "Resource": "*"
            }
        ]
    }
    
  7. Give the policy a name such as CustomSQSKinesis and choose Create policy.
  8. Navigate back to your Lambda function and choose Add trigger.
  9. Select Kinesis from the drop-down. Select your stream in the Kinesis stream dropdown.
  10. To see the new control options, choose the drop-down arrow on the Additional settings section.
  11. Paste the ARN of the previously created SQS queue (refer to this link to find the ARN).Additional settings
  12. Set the Maximum retry attempts to 1. Set the Maximum record age to 60. Choose Add.
    The Kinesis trigger has now been configured and the Lambda function has the required permissions to write to SQS and CloudWatch Logs.Success messageKinesis trigger added
  13. Choose the Lambda icon in the Designer section. Scroll down to the Function code section and paste the following code:
    exports.handler = async (event) => {
        // TODO implement
        console.log(event)
        const response = {
            statusCode: 200,
            body: event,dfh
        };
        return response;
    };

    This code has a syntax error in order to force a failure response.

    Next, you will trigger the Lambda function by adding a record to the Kinesis Stream via the AWS CLI. See how to install the AWS CLI if you have not previously done so.

  14. In your preferred terminal run the following command, replacing {YourStreamName} with the name of the stream you have created.
    aws kinesis put-record --stream-name {YourStreamName} --partition-key 123 --data testdata

    You should see a response similar to this (your sequence number will be different):
    Terminal response

  15. In the AWS Lambda console, choose Monitoring > View logs in CloudWatch and select the most recent log group.CloudWatch Logs
  16. As you can see the Lambda function failed as expected, but this time with only a single retry attempt. This is because the max retry attempt value was set to 1 and the record is younger than the max record age that was set to 60.
  17. In the AWS Management Console navigate to your SQS queue, Services > SQS.
  18. Select your queue and choose Queue Actions > View/Delete Messages > Start Polling for Messages.View/delete messages

Here you will see the metadata of your failed invocation. It has successfully been sent to the SQS destination for further investigation or processing.

Asynchronous function invocations

When a function is asynchronously invoked, Lambda sends the event to a queue before it is processed by your function. Invocations that result in an exception in the function code are retried twice with a delay of one minute before the first retry and two minutes before the second. Some invocations may never run due to a throttle or service fault: these are retried with exponential backoff until the invocation is six hours old.

Default asynchronous invocation retry logs

This retry behaviour shown above, works well in situations where every invocation must run at least once. However, in situations with a large volume of errors subsequent invocations are increasingly delayed as the backlog increases, as each new error places another retry event back into the event queue. If it were possible to skip the event without retrying, it would eliminate this delay and continue processing new events.

In order to avoid this default auto-retry policy, there are several widely used approaches:

  • Function error handling with AWS Step Functions
  • Using the event handler for error routing logic.
  • Third party monitoring services for exception alerts

These workarounds require extra resources, code, or custom logic and add latency to your applications. Retries caused by system failures due to timeouts, lack of memory, early exits, etc. could still occur.

New Lambda controls for asynchronous event invocations

New asynchronous event invoke controls mean that functions can now skip the processing of certain events and discard unwanted or backlogged requests from the asynchronous event queue without the need for “workarounds.” The controls will be accessible from the console and from a new Event Invoke Config subresource, listed in detail below:

{
    "MaximumEventAgeInSeconds": integer,
    "MaximumRetryAttempts": integer
}
Figure 4: Asynchronous event invoke configuration.

MaximumRetryAttempts

Set a maximum number of retry attempts for events before they can be skipped to unblock processing and prevent duplicate outputs.

Minimum: 0 | maximum: 2 | default: 2

MaximumEventAgeInSeconds

Define an event maximum age in seconds, with expired events skipped to allow processing to continue. Events that do not get successfully processed within defined age of submission are written to the function’s configured Dead Letter Queue and/or On-failure Destinations for asynchronous invocations. If none is configured, the event is discarded.

Minimum: 60 | maximum: 21600 (6 hours)

These controls can also be configured from the AWS Lambda console, in the “Failure handling configuration” section shown below:

Edit failure handling configurations

When the same Lambda function is run with maximum retry attempts set to 0, the Amazon CloudWatch Logs show that the function is only invoked a single time, and not retried after the initial error.

CloudWatch Logs

Conclusion

The addition of these new Lambda controls allows you to handle function failures and retries appropriately for both asynchronous and stream processing. This eliminates the need for custom-built logic, alerts, and added overhead.

Developers understand the needs of their own applications. These new features allow them to decide how to react to errors on a per-function basis. Whether that means real-time stream processing, non-blocking event queues, or more retries, it depends on the application’s requirements. With the new controls in place, the power is in your hands.

You can get started with the new controls for stream processing and asynchronous invocations via the AWS Management Console, AWS CLI, AWS SAM, AWS CloudFormation, or AWS SDK for Lambda.

New AWS Lambda scaling controls for Kinesis and DynamoDB event sources

Post Syndicated from Moheeb Zara original https://aws.amazon.com/blogs/compute/new-aws-lambda-scaling-controls-for-kinesis-and-dynamodb-event-sources/

AWS Lambda is introducing a new scaling parameter for Amazon Kinesis Data Streams and Amazon DynamoDB Streams event sources. Parallelization Factor can be set to increase concurrent Lambda invocations for each shard, which by default is 1. This allows for faster stream processing without the need to over-scale the number of shards, while still guaranteeing order of records processed.

There are two common optimization scenarios: high traffic and low traffic. For example, an online business might experience seasonal spikes in traffic. The following features help ensure that your business can scale appropriately to withstand the upcoming holiday season.

Handling high traffic with Parallelization Factor

A diagram showing how Parallelization Factor maintains order.

Each shard has uniquely identified sequences of data records. Each record contains a partition key to guarantee order and are organized separately into shards based on that key. The records from each shard must be polled to guarantee that records with the same partition key are processed in order.

When there is a high volume of data traffic, you want to process records as fast as possible. Before this release, customers were solving this by updating the number of shards on a Kinesis data stream. Increasing the number of shards increases the number of functions processing data from those shards. One Lambda function invocation processes one shard at a time.

You can now use the new Parallelization Factor to specify the number of concurrent batches that Lambda polls from a single shard. This feature introduces more flexibility in scaling options for Lambda and Kinesis. The default factor of one exhibits normal behavior. A factor of two allows up to 200 concurrent invocations on 100 Kinesis data shards. The Parallelization Factor can be scaled up to 10.

Each parallelized shard contains messages with the same partition key. This means record processing order will still be maintained and each parallelized shard must complete before processing the next.

Using Parallelization Factor

Since Parallelization Factor is quickly set on an event source mapping, it can be increased or decreased on demand. Fully automated scaling of stream processing is now possible.

For example, Amazon CloudWatch can be used to monitor changes in traffic. High traffic can cause the IteratorAge metric to increase, and an alarm can be created if this occurs for some specified period of time. The alarm can trigger a Lambda function that uses the UpdateEventSourceMapping API to increase the Parallelization Factor. In the same way, an alarm can be set to reduce the factor if traffic decreases.

You can enable Parallelization Factor in the AWS Lambda console by creating or updating a Kinesis or DynamoDB event source. Choose Additional settings and set the Concurrent batches per shard to the desired factor, between 1 and 10.

Configuring the Parallelization Factor from the AWS Lambda console.

Configuring the Parallelization Factor from the AWS Lambda console.

You can also enable this feature from the AWS CLI using the –-parallelization-factor parameter when creating or updating an event source mapping.

$ aws lambda create-event-source-mapping --function-name my-function \
--parallelization-factor 2 --batch-size 100 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \
--event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
{
	"UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284",
	“ParallelizationFactor”: 2,
	"BatchSize": 100,
	"MaximumBatchingWindowInSeconds": 0,
	"EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",
	"FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function",
	"LastModified": 1541139209.351,
	"LastProcessingResult": "No records processed",
	"State": "Creating",
	"StateTransitionReason": "User action"
}

Handling low traffic with Batch Window

Previously, you could use Batch Size to handle low volumes, or handle tasks that were not time sensitive. Batch Size configures the number of records to read from a shard, up to 10,000. The payload limit of a single invocation is 6 MB.

In September, we launched Batch Window, which allows you to fine tune when Lambda invocations occur. Lambda normally reads records from a Kinesis data stream at a particular interval. This feature is ideal in situations where data is sparse and batches of data take time to build up.

Using Batch Window, you can set your function to wait up to 300 seconds for a batch to build before processing it. This means you can also set your function to process on certain conditions, such as reaching the payload size, or Batch Size reaching its maximum value. With Batch Window, you can manage the average number of records processed by the function with each invocation. This allows you to increase the efficiency of each invocation and reduce the total number.

Batch Window is set when adding a new event trigger in the AWS Lambda console.

Adding an event source trigger in the AWS Lambda console

Adding an event source trigger in the AWS Lambda console

It can also be set using AWS CLI with the --maximum-batching-window-in-seconds parameter.

$ aws lambda create-event-source-mapping --function-name my-function \
--maximum-batching-window-in-seconds 300 --batch-size 100 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \
--event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
{
	"UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284",
	"BatchSize": 100,
	"MaximumBatchingWindowInSeconds": 300,
	"EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",
	"FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function",
	"LastModified": 1541139209.351,
	"LastProcessingResult": "No records processed",
	"State": "Creating",
	"StateTransitionReason": "User action"
}

Conclusion

You now have new options for managing scale in Amazon Kinesis and Amazon DynamoDB stream processing.  The Batch Window parameter allows you to tune how long to wait before processing a batch, ideal for low traffic or tasks that aren’t time sensitive. The Parallelization Factor parameter enables faster stream processing of ordered records at high volume, using concurrent Lambda invocations per shard. Both of these features can lead to more efficient stream processing.

New – AWS IoT Greengrass Adds Container Support and Management of Data Streams at the Edge

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-aws-iot-greengrass-adds-docker-support-and-streams-management-at-the-edge/

AWS IoT Greengrass extends cloud capabilities to edge devices, so that they can respond to local events in near real-time, even with intermittent connectivity.

Today, we are adding two features that make it easier to build IoT solutions:

  • Container support to deploy applications using the Greengrass Docker application deployment connector.
  • Collect, process, and export data streams from edge devices and manage the lifecycle of that data with the Stream Manager for AWS IoT Greengrass.

Let’s see how these new features work and how to use them.

Deploying a Container-Based Application to a Greengrass Core Device
You can now run AWS Lambda functions and container-based applications in your AWS IoT Greengrass core device. In this way it is easier to migrate applications from on-premises, or build new applications that include dependencies such as libraries, other binaries, and configuration files, using container images. This provides a consistent deployment environment for your applications that enables portability across development environments and edge locations. You can easily deploy legacy and third-party applications by packaging the code or executables into the container images.

To use this feature, I describe my container-based application using a Docker Compose file. I can reference container images in public or private repositories, such as Amazon Elastic Container Registry (ECR) or Docker Hub. To start, I create a simple web app using Python and Flask that counts the number of times it is visualized.

from flask import Flask

app = Flask(__name__)

counter = 0

@app.route('/')
def hello():
    global counter
    counter += 1
    return 'Hello World! I have been seen {} times.\n'.format(counter)

My requirements.txt file contains a single dependency, flask.

I build the container image using this Dockerfile and push it to ECR.

FROM python:3.7-alpine
WORKDIR /code
ENV FLASK_APP app.py
ENV FLASK_RUN_HOST 0.0.0.0
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
COPY . .
CMD ["flask", "run"]

Here is the docker-compose.yml file referencing the container image in my ECR repository. Docker Compose files can describe applications using multiple containers, but for this example I am using just one.

version: '3'
services:
  web:
    image: "123412341234.dkr.ecr.us-east-1.amazonaws.com/hello-world-counter:latest"
    ports:
      - "80:5000"

I upload the docker-compose.yml file to an Amazon Simple Storage Service (S3) bucket.

Now I create an AWS IoT Greengrass group using an Amazon Elastic Compute Cloud (EC2) instance as core device. Usually your core device is outside of the AWS cloud, but using an EC2 instance can be a good way to set up and automate a dev & test environment for your deployments at the edge.

When the group is ready, I run an “empty” deployment, just to check that everything is working as expected. After a few seconds, my first deployment has completed and I start adding a connector.

In the connector section of the AWS IoT Greengrass group, I select Add a connector and search for “Docker”. I select Docker Application Deployment and hit Next.

Now I configure the parameters for the connector. I select my docker-compose.yml file on S3. The AWS Identity and Access Management (IAM) role used by the AWS IoT Greengrass group needs permissions to get the file from S3 and to get the authorization token and download the image from ECR. If you use a private repository such as Docker Hub, you can leverage the integration with the AWS Secret Manager to make it easy for your connectors and Lambda functions to use local secrets to interact with services and applications.

I deploy my changes, similarly to what I did before. This time, the new container-based application is installed and started on the AWS IoT Greengrass core device.

To test the web app that I deployed, I open access to the HTTP port on the Security Group of the EC2 instance I am using as core device. When I connect with my browser, I see the Flask app starting to count the visits. My container-based application is running on the AWS IoT Greengrass core device!

You can deploy much more complex applications than what I did in this example. Let’s see that as we go through the other feature released today.

Using the Stream Manager for AWS IoT Greengrass
For common use cases like video processing, image recognition, or high-volume data collection from sensors at the edge, you often need to build your own data stream management capabilities. The new Stream Manager simplifies this process by adding a standardized mechanism to the Greengrass Core SDK that you can use to process data streams from IoT devices, manage local data retention policies based on cache size or data age, and automatically transmit data directly into AWS cloud services such as Amazon Kinesis and AWS IoT Analytics.

The Stream Manager also handles disconnected or intermittent connectivity scenarios by adding configurable prioritization, caching policies, bandwidth utilization, and time-outs on a per-stream basis. In situations where connectivity is unpredictable or bandwidth is constrained, this new functionality enables you to define the behavior of your applications’ data management while disconnected, reconnecting, or connected, allowing you to prioritize important data’s path to the cloud and make efficient use of a connection when it is available. Using this feature, you can focus on your specific application use cases rather than building data retention and connection management functionality.

Let’s see now how the Stream Manager works with a practical use case. For example, my AWS IoT Greengrass core device is receiving lots of data from multiple devices. I want to do two things with the data I am collecting:

  • Upload all row data with low priority to AWS IoT Analytics, where I use Amazon QuickSight to visualize and understand my data.
  • Aggregate data locally based on time and location of the devices, and send the aggregated data with high priority to a Kinesis Data Stream that is processed by a business application for predictive maintenance.

Using the Stream Manager in the Greengrass Core SDK, I create two local data streams:

  • The first local data stream has a configured low-priority export to IoT Analytics and can use up to 256MB of local disk (yes, it’s a constrained device). You can use memory to store the local data stream if you prefer speed to resilience. When local space is filled up, for example because I lost connectivity to the cloud and I continue to cache locally, I can choose to either reject new data or overwrite the oldest data.
  • The second local data stream is exporting data with high priority to a Kinesis Data Stream and can use up to 128MB of local disk (it’s aggregated data, I need less space for the same amount of time).

 

Here’s how the data flows in this architecture:

  • Sensor data is collected by a Producer Lambda function that is writing to the first local data stream.
  • A second Aggregator Lambda function is reading from the first local data stream, performing the aggregation, and writing its output to the second local data stream.
  • A Reader container-based app (deployed using the Docker application deployment connector) is rendering the aggregated data in real-time for a display panel.
  • The Stream Manager takes care of the ingestion to the cloud, based on the configuration and the policies of the local data streams, so that developers can focus their efforts on the logic on the device.

The use of Lambda functions or container-based apps in the previous architecture is just an example. You can mix and match, or standardize to one or the other, depending on your development best practices.

Available Now
The Docker application deployment connector and the Stream Manager are available with Greengrass version 1.10. The Stream Manager is available in the Greengrass Core SDK for Java and Python. We are adding support for other platforms based on customer feedback.

These new features are independent from each other, but can be used together as in my example. They can simplify the way you build and deploy applications on edge devices, making it easier to process data locally and be integrated with streaming and analytics services in the backend. Let me know what you are going to use these features for!

Danilo

New Issue of Architecture Monthly: Games

Post Syndicated from Annik Stahl original https://aws.amazon.com/blogs/architecture/new-issue-of-architecture-monthly-games/

Architecture Monthyl Magazine - September 2019 (Games)This month’s Architecture Monthly magazine is all about games—not Scrabble, not Uno, not Twister, and certainly not hide-and-seek.

No, we’re talking the big business of online, multiplayer games. And did you know that approximately 90% of large, public game companies are running on the AWS cloud? Yep, I’m talking Epic (ever heard of Fortnite?), Ubisoft, Nintendo, and more. I had the opportunity to sit down with a senior tech leader for AWS Games, who talked about why companies are moving to the cloud from on-premise, and it’s about a whole lot more than just games for entertainment. We got into the big-money world of competitive eSports as well as the gamification of learning processes and economics.

Consider Twitch, often defined as Amazon’s live streaming platform for gamers. But Twitch is much more than a gaming platform. For example, AWS Live Video on Twitch offers live streaming about everything from how to develop serverless apps and robots to interactive quiz shows that help you prepare for AWS Certification exams. And of course, you can also learn about the technology that powers your favorite video games.

September’s Issue

For September’s issue, we’ve assembled architectural best practices about games from all over AWS, and we’ve made sure that a broad audience can appreciate it.

How to Access the Magazine

We hope you’re enjoying Architecture Monthly, and we’d like to hear from you—leave us star rating and comment on the Amazon Kindle page or contact us anytime at [email protected].

Visualizing Sensor Data in Amazon QuickSight

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/visualizing-sensor-data-in-amazon-quicksight/

This post is courtesy of Moheeb Zara, Developer Advocate, AWS Serverless

The Internet of Things (IoT) is a term used wherever physical devices are networked in some meaningful connected way. Often, this takes the form of sensor data collection and analysis. As the number of devices and size of data scales, it can become costly and difficult to keep up with demand.

Using AWS Serverless Application Model (AWS SAM), you can reduce the cost and time to market of an IoT solution. This guide demonstrates how to collect and visualize data from a low-cost, Wi-Fi connected IoT device using a variety of AWS services. Much of this can be accomplished within the AWS Free Usage Tier, which is necessary for the following instructions.

Services used

The following services are used in this example:

What’s covered in this post?

This post covers:

  • Connecting an Arduino MKR 1010 Wi-Fi device to AWS IoT Core.
  • Forwarding messages from an AWS IoT Core topic stream to a Lambda function.
  • Using a Kinesis Data Firehose delivery stream to store data in S3.
  • Analyzing and visualizing data stored in S3 using Amazon QuickSight.

Connect the device to AWS IoT Core using MQTT

The Arduino MKR 1010 is a low-cost, Wi-Fi enabled, IoT device, shown in the following image.

An Arduino MKR 1010 Wi-Fi microcontroller

Its analog and digital input and output pins can be used to read sensors or to write to actuators. Arduino provides a detailed guide on how to securely connect this device to AWS IoT Core. The following steps build upon it to push arbitrary sensor data to a topic stream and ultimately visualize that data using Amazon QuickSight.

  1. Start by following this comprehensive guide to using an Arduino MKR 1010 with AWS IoT Core. Upon completion, your device is connected to AWS IoT Core using MQTT (Message Queuing Telemetry Transport), a protocol for publishing and subscribing to messages using topics.
  2. In the Arduino IDE, choose File, Sketch, Include Library, and Manage Libraries.
  3. In the window that opens, search for ArduinoJson and select the library by Benoit Blanchon. Choose install.

4. Add #include <ArduinoJson.h> to the top of your sketch from the Arduino guide.

5. Modify the publishMessage() function with this code. It publishes a JSON message with two keys: time (ms) and the current value read from the first analog pin.

void publishMessage() {  
  Serial.println("Publishing message");

  // send message, the Print interface can be used to set the message contents
  mqttClient.beginMessage("arduino/outgoing");
  
  // create json message to send
  StaticJsonDocument<200> doc;
  doc["time"] = millis();
  doc["sensor_a0"] = analogRead(0);
  serializeJson(doc, mqttClient); // print to client
  
  mqttClient.endMessage();
}

6. Save and upload the sketch to your board.

Create a Kinesis Firehose delivery stream

Amazon Kinesis Data Firehose is a service that reliably loads streaming data into data stores, data lakes, and analytics tools. Amazon QuickSight requires a data store to create visualizations of the sensor data. This simple Kinesis Data Firehose delivery stream continuously uploads data to an S3 storage bucket. The next sections cover how to add records to this stream using a Lambda function.

  1. In the Kinesis Data Firehose console, create a new delivery stream, called SensorDataStream.
  2. Leave the default source as a Direct PUT or other sources and choose Next.
  3. On the next screen, leave all the default values and choose Next.
  4. Select Amazon S3 as the destination and create a new bucket with a unique name. This is where records are continuously uploaded so that they can be used by Amazon QuickSight.
  5. On the next screen, choose Create New IAM Role, Allow. This gives the Firehose delivery stream permission to upload to S3.
  6. Review and then choose Create Delivery Stream.

It can take some time to fully create the stream. In the meantime, continue on to the next section.

Invoking Lambda using AWS IoT Core rules

Using AWS IoT Core rules, you can forward messages from devices to a Lambda function, which can perform actions such as uploading to an Amazon DynamoDB table or an S3 bucket, or running data against various Amazon Machine Learning services. In this case, the function transforms and adds a message to the Kinesis Data Firehose delivery stream, which then adds that data to S3.

AWS IoT Core rules use the MQTT topic stream to trigger interactions with other AWS services. An AWS IoT Core rule is created by using an SQL statement, a topic filter, and a rule action. The Arduino example publishes messages every five seconds on the topic arduino/outgoing. The following instructions show how to consume those messages with a Lambda function.

Create a Lambda function

Before creating an AWS IoT Core rule, you need a Lambda function to consume forwarded messages.

  1. In the AWS Lambda console, choose Create function.
  2. Name the function ArduinoConsumeMessage.
  3. For Runtime, choose Author From Scratch, Node.js10.x. For Execution role, choose Create a new role with basic Lambda permissions. Choose Create.
  4. On the Execution role card, choose View the ArduinoConsumeMessage-role-xxxx on the IAM console.
  5. Choose Attach Policies. Then, search for and select AmazonKinesisFirehoseFullAccess.
  6. Choose Attach Policy. This applies the necessary permissions to add records to the Firehose delivery stream.
  7. In the Lambda console, in the Designer card, select the function name.
  8. Paste the following in the code editor, replacing SensorDataStream with the name of your own Firehose delivery stream. Choose Save.
const AWS = require('aws-sdk')

const firehose = new AWS.Firehose()
const StreamName = "SensorDataStream"

exports.handler = async (event) => {
    
    console.log('Received IoT event:', JSON.stringify(event, null, 2))
    
    let payload = {
        time: new Date(event.time),
        sensor_value: event.sensor_a0
    }
    
    let params = {
            DeliveryStreamName: StreamName,
            Record: { 
                Data: JSON.stringify(payload)
            }
        }
        
    return await firehose.putRecord(params).promise()

}

Create an AWS IoT Core rule

To create an AWS IoT Core rule, follow these steps.

  1. In the AWS IoT console, choose Act.
  2. Choose Create.
  3. For Rule query statement, copy and paste SELECT * FROM 'arduino/outgoing’. This subscribes to the outgoing message topic used in the Arduino example.
  4. Choose Add action, Send a message to a Lambda function, Configure action.
  5. Select the function created in the last set of instructions.
  6. Choose Create rule.

At this stage, any message published to the arduino/outgoing topic forwards to the ArduinoConsumeMessage Lambda function, which transforms and puts the payload on the Kinesis Data Firehose stream and also logs the message to Amazon CloudWatch. If you’ve connected an Arduino device to AWS IoT Core, it publishes to that topic every five seconds.

The following steps show how to test functionality using the AWS IoT console.

  1. In the AWS IoT console, choose Test.
  2. For Publish, enter the topic arduino/outgoing .
  3. Enter the following test payload:
    {
      “time”: 1567023375013,  
      “sensor_a0”: 456
    }
  4. Choose Publish to topic.
  5. Navigate back to your Lambda function.
  6. Choose Monitoring, View logs in CloudWatch.
  7. Select a log item to view the message contents, as shown in the following screenshot.

Visualizing data with Amazon QuickSight

To visualize data with Amazon QuickSight, follow these steps.

  1. In the Amazon QuickSight console, sign up.
  2. Choose Manage Data, New Data Set. Select S3 as the data source.
  3. A manifest file is necessary for Amazon QuickSight to be able to fetch data from your S3 bucket. Copy the following into a file named manifest.json. Replace YOUR-BUCKET-NAME with the name of the bucket created for the Firehose delivery stream.
    {
       "fileLocations":[
          {
             "URIPrefixes":[
                "s3://YOUR-BUCKET-NAME/"
             ]
          }
       ],
       "globalUploadSettings":{
          "format":"JSON"
       }
    }
  4. Upload the manifest.json file.
  5. Choose Connect, then Visualize. You may have to give Amazon QuickSight explicit permissions to your S3 bucket.
  6. Finally, design the Amazon QuickSight visualizations in the drag and drop editor. Drag the two available fields into the center card to generate a Sum of Sensor_value by Time visual.

Conclusion

This post demonstrated visualizing data from a securely connected remote IoT device. This was achieved by connecting an Arduino to AWS IoT Core using MQTT, forwarding messages from the topic stream to Lambda using IoT Core rules, putting records on an Amazon Kinesis Data Firehose delivery stream, and using Amazon QuickSight to visualize the data stored within an S3 bucket.

With these building blocks, it is possible to implement highly scalable and customizable IoT data collection, analysis, and visualization. With the use of other AWS services, you can build a full end-to-end platform for an IoT product that can reliably handle volume. To further explore how hardware and AWS Serverless can work together, visit the Amazon Web Services page on Hackster.

One to Many: Evolving VPC Design

Post Syndicated from Androski Spicer original https://aws.amazon.com/blogs/architecture/one-to-many-evolving-vpc-design/

Since its inception, the Amazon Virtual Private Cloud (VPC) has acted as the embodiment of security and privacy for customers who are looking to run their applications in a controlled, private, secure, and isolated environment.

This logically isolated space has evolved, and in its evolution has increased the avenues that customers can take to create and manage multi-tenant environments with multiple integration points for access to resources on-premises.

This blog is a two-part series that begins with a look at the Amazon VPC as a single unit of networking in the AWS Cloud but eventually takes you to a world in which simplified architectures for establishing a global network of VPCs are possible.

From One VPC: Single Unit of Networking

To be successful with the AWS Virtual Private Cloud you first have to define success for today and what success might look like as your organization’s adoption of the AWS cloud increases and matures. In essence, your VPCs should be designed to satisfy the needs of your applications today and must be scalable to accommodate future needs.

Classless Inter-Domain Routing (CIDR) notations are used to denote the size of your VPC. AWS allows you specify a CIDR block between /16 and /28. The largest, /16, provides you with 65,536 IP addresses and the smallest possible allowed CIDR block, /28, provides you with 16 IP addresses. Note, the first four IP addresses and the last IP address in each subnet CIDR block are not available for you to use, and cannot be assigned to an instance.

AWS VPC supports both IPv4 and IPv6. It is required that you specify an IPv4 CIDR range when creating a VPC. Specifying an IPv6 range is optional.

Customers can specify ANY IPv4 address space for their VPC. This includes but is not limited to RFC 1918 addresses.

After creating your VPC, you divide it into subnets. In an AWS VPC, subnets are not isolation boundaries around your application. Rather, they are containers for routing policies.

Isolation is achieved by attaching an AWS Security Group (SG) to the EC2 instances that host your application. SGs are stateful firewalls, meaning that connections are tracked to ensure return traffic is allowed. They control inbound and outbound access to the elastic network interfaces that are attached to an EC2 instance. These should be tightly configured, only allowing access as needed.

It is our best practice that subnets should be created in categories. There two main categories; public subnets and private subnets. At minimum they should be designed as outlined in the below diagrams for IPv4 and IPv6 subnet design.

Recommended IPv4 subnet design pattern

Recommended IPv6 subnet design pattern

Subnet types are denoted by the ability and inability for applications and users on the internet to directly initiate access to infrastructure within a subnet.

Public Subnets

Public subnets are attached to a route table that has a default route to the Internet via an Internet gateway.

Resources in a public subnet can have a public IP or Elastic IP (EIP) that has a NAT to the Elastic Network Interface (ENI) of the virtual machines or containers that hosts your application(s). This is a one-to-one NAT that is performed by the Internet gateway.

Illustration of public subnet access path to the Internet through the Internet Gateway (IGW)

Private Subnets

A private subnet contains infrastructure that isn’t directly accessible from the Internet. Unlike the public subnet, this infrastructure only has private IPs.

Infrastructure in a private subnet gain access to resources or users on the Internet through a NAT infrastructure of sorts.

AWS natively provides NAT capability through the use of the NAT Gateway service. Customers can also create NAT instances that they manage or leverage third-party NAT appliances from the AWS Marketplace.

In most scenarios, it is recommended to use the AWS NAT Gateway as it is highly available (in a single Availability Zone) and is provided as a managed service by AWS. It supports 5 Gbps of bandwidth per NAT gateway and automatically scales up to 45 Gbps.

An AWS NAT gateway’s high availability is confined to a single Availability Zone. For high availability across AZs, it is recommended to have a minimum of two NAT gateways (in different AZs). This allows you to switch to an available NAT gateway in the event that one should become unavailable.

This approach allows you to zone your Internet traffic, reducing cross Availability Zone connections to the Internet. More details on NAT gateway are available here.

Illustration of an environment with a single NAT Gateway (NAT-GW)

Illustration of high availability with a multiple NAT Gateways (NAT-GW) attached to their own route table

Illustration of the failure of one NAT Gateway and the fail over to an available NAT Gateway by the manual changing of the default route next hop in private subnet A route table

AWS allocated IPv6 addresses are Global Unicast Addresses by default. That said, you can privatize these subnets by using an Egress-Only Internet Gateway (E-IGW), instead of a regular Internet gateway. E-IGWs are purposely built to prevents users and applications on the Internet from initiating access to infrastructure in your IPv6 subnet(s).

Illustration of internet access for hybrid IPv6 subnets through an Egress-Only Internet Gateway (E-IGW)

Applications hosted on instances living within a private subnet can have different access needs. Some require access to the Internet while others require access to databases, applications, and users that are on-premises. For this type of access, AWS provides two avenues: the Virtual Gateway and the Transit Gateway. The Virtual Gateway can only support a single VPC at a time, while the Transit Gateway is built to simplify the interconnectivity of tens to hundreds of VPCs and then aggregating their connectivity to resources on-premises. Given that we are looking at the VPC as a single unit of networking, all diagrams below contain illustrations of the Virtual Gateway which acts a WAN concentrator for your VPC.

Illustration of private subnets connecting to data center via a Virtual Gateway (VGW)

 

Illustration of private subnets connecting to Data Center via a VGW

 

Illustration of private subnets connecting to Data Center using AWS Direct Connect as primary and IPsec as backup

The above diagram illustrates a WAN connection between a VGW attached to a VPC and a customer’s data center.

AWS provides two options for establishing a private connectivity between your VPC and on-premises network: AWS Direct Connect and AWS Site-to-Site VPN.

AWS Site-to-Site VPN configuration leverages IPSec with each connection providing two redundant IPSec tunnels. AWS support both static routing and dynamic routing (through the use of BGP).

BGP is recommended, as it allows dynamic route advertisement, high availability through failure detection, and fail over between tunnels in addition to decreased management complexity.

VPC Endpoints: Gateway & Interface Endpoints

Applications running inside your subnet(s) may need to connect to AWS public services (like Amazon S3, Amazon Simple Notification Service (SNS), Amazon Simple Queue Service (SQS), Amazon API Gateway, etc.) or applications in another VPC that lives in another account. For example, you may have a database in another account that you would like to expose applications that lives in a completely different account and subnet.

For these scenarios you have the option to leverage an Amazon VPC Endpoint.

There are two types of VPC Endpoints: Gateway Endpoints and Interface Endpoints.

Gateway Endpoints only support Amazon S3 and Amazon DynamoDB. Upon creation, a gateway is added to your specified route table(s) and acts as the destination for all requests to the service it is created for.

Interface Endpoints differ significantly and can only be created for services that are powered by AWS PrivateLink.

Upon creation, AWS creates an interface endpoint consisting of one or more Elastic Network Interfaces (ENIs). Each AZ can support one interface endpoint ENI. This acts as a point of entry for all traffic destined to a specific PrivateLink service.

When an interface endpoint is created, associated DNS entries are created that point to the endpoint and each ENI that the endpoint contains. To access the PrivateLink service you must send your request to one of these hostnames.

As illustrated below, ensure the Private DNS feature is enabled for AWS public and Marketplace services:

Since interface endpoints leverage ENIs, customers can use cloud techniques they are already familiar with. The interface endpoint can be configured with a restrictive security group. These endpoints can also be easily accessed from both inside and outside the VPC. Access from outside a VPC can be accomplished through Direct Connect and VPN.

Illustration of a solution that leverages an interface and gateway endpoint

Customers can also create AWS Endpoint services for their applications or services running on-premises. This allows access to these services via an interface endpoint which can be extended to other VPCs (even if the VPCs themselves do not have Direct Connect configured).

VPC Sharing

At re:Invent 2018, AWS launched the feature VPC sharing, which helps customers control VPC sprawl by decoupling the boundary of an AWS account from the underlying VPC network that supports its infrastructure.

VPC sharing uses Amazon Resource Access Manager (RAM) to share subnets across accounts within the same AWS organization.

VPC sharing is defined as:

VPC sharing allows customers to centralize the management of network, its IP space and the access paths to resources external to the VPC. This method of centralization and reuse (of VPC components such as NAT Gateway and Direct Connect connections) results in a reduction of cost to manage and maintain this environment.

Great, but there are times when a customer needs to build networks with multiple VPCs in and across AWS regions. How should this be done and what are the best practices?

This will be answered in part two of this blog.

 

 

ICYMI: Serverless Q2 2019

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/icymi-serverless-q2-2019/

This post is courtesy of Moheeb Zara, Senior Developer Advocate – AWS Serverless

Welcome to the sixth edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all of the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

In case you missed our last ICYMI, checkout what happened last quarter here.

April - June 2019

Amazon EventBridge

Before we dive in to all that happened in Q2, we’re excited about this quarter’s launch of Amazon EventBridge, the serverless event bus that connects application data from your own apps, SaaS, and AWS-as-a-service. This allows you to create powerful event-driven serverless applications using a variety of event sources.

Our very own AWS Solutions Architect, Mike Deck, sat down with AWS Serverless Hero Jeremy Daly and recorded a podcast on Amazon EventBridge. It’s a worthy listen if you’re interested in exploring all the features offered by this launch.

Now, back to Q2, here’s what’s new.

AWS Lambda

Lambda Monitoring

Amazon CloudWatch Logs Insights now allows you to see statistics from recent invocations of your Lambda functions in the Lambda monitoring tab.

Additionally, as of June, you can monitor the [email protected] functions associated with your Amazon CloudFront distributions directly from your Amazon CloudFront console. This includes a revamped monitoring dashboard for CloudFront distributions and [email protected] functions.

AWS Step Functions

Step Functions

AWS Step Functions now supports workflow execution events, which help in the building and monitoring of even-driven serverless workflows. Automatic Execution event notifications can be delivered upon start/completion of CloudWatch Events/Amazon EventBridge. This allows services such as AWS Lambda, Amazon SNS, Amazon Kinesis, or AWS Step Functions to respond to these events.

Additionally you can use callback patterns to automate workflows for applications with human activities and custom integrations with third-party services. You create callback patterns in minutes with less code to write and maintain, run without servers and infrastructure to manage, and scale reliably.

Amazon API Gateway

API Gateway Tag Based Control

Amazon API Gateway now offers tag-based access control for WebSocket APIs using AWS Identity and Access Management (IAM) policies, allowing you to categorize API Gateway resources for WebSocket APIs by purpose, owner, or other criteria.  With the addition of tag-based access control to WebSocket resources, you can now give permissions to WebSocket resources at various levels by creating policies based on tags. For example, you can grant full access to admins to while limiting access to developers.

You can now enforce a minimum Transport Layer Security (TLS) version and cipher suites through a security policy for connecting to your Amazon API Gateway custom domain.

In addition, Amazon API Gateway now allows you to define VPC Endpoint policies, enabling you to specify which Private APIs a VPC Endpoint can connect to. This enables granular security control using VPC Endpoint policies.

AWS Amplify

Amplify CLI (part of the open source Amplify Framework) now includes support for adding and configuring AWS Lambda triggers for events when using Amazon Cognito, Amazon Simple Storage Service, and Amazon DynamoDB as event sources. This means you can setup custom authentication flows for mobile and web applications via the Amplify CLI and Amazon Cognito User Pool as an authentication provider.

Amplify Console

Amplify Console,  a Git-based workflow for continuous deployment and hosting for fullstack serverless web apps, launched several updates to the build service including SAM CLI and custom container support.

Amazon Kinesis

Amazon Kinesis Data Firehose can now utilize AWS PrivateLink to securely ingest data. AWS PrivateLink provides private connectivity between VPCs, AWS services, and on-premises applications, securely over the Amazon network. When AWS PrivateLink is used with Amazon Kinesis Data Firehose, all traffic to a Kinesis Data Firehose from a VPC flows over a private connection.

You can now assign AWS resource tags to applications in Amazon Kinesis Data Analytics. These key/value tags can be used to organize and identify resources, create cost allocation reports, and control access to resources within Amazon Kinesis Data Analytics.

Amazon Kinesis Data Firehose is now available in the AWS GovCloud (US-East), Europe (Stockholm), Asia Pacific (Seoul), Asia Pacific (Singapore), Asia Pacific (Sydney), Asia Pacific (Tokyo), and EU (London) regions.

For a complete list of where Amazon Kinesis Data Analytics is available, please see the AWS Region Table.

AWS Cloud9

Cloud9 Quick Starts

Amazon Web Services (AWS) Cloud9 integrated development environment (IDE) now has a Quick Start which deploys in the AWS cloud in about 30 minutes. This enables organizations to provide developers a powerful cloud-based IDE that can edit, run, and debug code in the browser and allow easy sharing and collaboration.

AWS Cloud9 is also now available in the EU (Frankfurt) and Asia Pacific (Tokyo) regions. For a current list of supported regions, see AWS Regions and Endpoints in the AWS documentation.

Amazon DynamoDB

You can now tag Amazon DynamoDB tables when you create them. Tags are labels you can attach to AWS resources to make them easier to manage, search, and filter.  Tagging support has also been extended to the AWS GovCloud (US) Regions.

DynamoDBMapper now supports Amazon DynamoDB transactional API calls. This support is included within the AWS SDK for Java. These transactional APIs provide developers atomic, consistent, isolated, and durable (ACID) operations to help ensure data correctness.

Amazon DynamoDB now applies adaptive capacity in real time in response to changing application traffic patterns, which helps you maintain uninterrupted performance indefinitely, even for imbalanced workloads.

AWS Training and Certification has launched Amazon DynamoDB: Building NoSQL Database–Driven Applications, a new self-paced, digital course available exclusively on edX.

Amazon Aurora

Amazon Aurora Serverless MySQL 5.6 can now be accessed using the built-in Data API enabling you to access Aurora Serverless with web services-based applications, including AWS LambdaAWS AppSync, and AWS Cloud9. For more check out this post.

Sharing snapshots of Aurora Serverless DB clusters with other AWS accounts or publicly is now possible. We are also giving you the ability to copy Aurora Serverless DB cluster snapshots across AWS regions.

You can now set the minimum capacity of your Aurora Serverless DB clusters to 1 Aurora Capacity Unit (ACU). With Aurora Serverless, you specify the minimum and maximum ACUs for your Aurora Serverless DB cluster instead of provisioning and managing database instances. Each ACU is a combination of processing and memory capacity. By setting the minimum capacity to 1 ACU, you can keep your Aurora Serverless DB cluster running at a lower cost.

AWS Serverless Application Repository

The AWS Serverless Application Repository is now available in 17 regions with the addition of the AWS GovCloud (US-West) region.

Region support includes Asia Pacific (Mumbai, Singapore, Sydney, Tokyo), Canada (Central), EU (Frankfurt, Ireland, London, Paris, Stockholm), South America (São Paulo), US West (N. California, Oregon), and US East (N. Virginia, Ohio).

Amazon Cognito

Amazon Cognito has launched a new API – AdminSetUserPassword – for the Cognito User Pool service that provides a way for administrators to set temporary or permanent passwords for their end users. This functionality is available for end users even when their verified phone or email are unavailable.

Serverless Posts

April

May

June

Events

Events this quarter

Senior Developer Advocates for AWS Serverless spoke at several conferences this quarter. Here are some recordings worth watching!

Tech Talks

We hold several AWS Online Tech Talks covering serverless tech talks throughout the year, so look out for them in the Serverless section of the AWS Online Tech Talks page. Here are the ones from Q2.

Twitch

Twitch Series

In April, we started a 13-week deep dive into building APIs on AWS as part of our Twitch Build On series. The Building Happy Little APIs series covers the common and not-so-common use cases for APIs on AWS and the features available to customers as they look to build secure, scalable, efficient, and flexible APIs.

There are also a number of other helpful video series covering Serverless available on the AWS Twitch Channel.

Build with Serverless on Twitch

Serverless expert and AWS Specialist Solutions architect, Heitor Lessa, has been hosting a weekly Twitch series since April. Join him and others as they build an end-to-end airline booking solution using serverless. The final episode airs on August 7th at Wednesday 8:00am PT.

Here’s a recap of the last quarter:

AWS re:Invent

AWS re:Invent 2019

AWS re:Invent 2019 is around the corner! From December 2 – 6 in Las Vegas, Nevada, join tens of thousands of AWS customers to learn, share ideas, and see exciting keynote announcements. Be sure to take a look at the growing catalog of serverless sessions this year.

Register for AWS re:Invent now!

What did we do at AWS re:Invent 2018? Check out our recap here: AWS re:Invent 2018 Recap at the San Francisco Loft

AWS Serverless Heroes

We urge you to explore the efforts of our AWS Serverless Heroes Community. This is a worldwide network of AWS Serverless experts with a diverse background of experience. For example, check out this post from last month where Marcia Villalba demonstrates how to set up unit tests for serverless applications.

Still looking for more?

The Serverless landing page has lots of information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

Understanding the Different Ways to Invoke Lambda Functions

Post Syndicated from George Mao original https://aws.amazon.com/blogs/architecture/understanding-the-different-ways-to-invoke-lambda-functions/

In our first post, we talked about general design patterns to enable massive scale with serverless applications. In this post, we’ll review the different ways you can invoke Lambda functions and what you should be aware of with each invocation model.

Synchronous Invokes

Synchronous invocations are the most straight forward way to invoke your Lambda functions. In this model, your functions execute immediately when you perform the Lambda Invoke API call. This can be accomplished through a variety of options, including using the CLI or any of the supported SDKs.

Here is an example of a synchronous invoke using the CLI:

aws lambda invoke —function-name MyLambdaFunction —invocation-type RequestResponse —payload  “[JSON string here]”

The Invocation-type flag specifies a value of “RequestResponse”. This instructs AWS to execute your Lambda function and wait for the function to complete. When you perform a synchronous invoke, you are responsible for checking the response and determining if there was an error and if you should retry the invoke.

Many AWS services can emit events that trigger Lambda functions. Here is a list of services that invoke Lambda functions synchronously:

Asynchronous Invokes

Here is an example of an asynchronous invoke using the CLI:

aws lambda invoke —function-name MyLambdaFunction —invocation-type Event —payload  “[JSON string here]”

Notice, the Invocation-type flag specifies “Event.” If your function returns an error, AWS will automatically retry the invoke twice, for a total of three invocations.

Here is a list of services that invoke Lambda functions asynchronously:

Asynchronous invokes place your invoke request in Lambda service queue and we process the requests as they arrive. You should use AWS X-Ray to review how long your request spent in the service queue by checking the “dwell time” segment.

Poll based Invokes

This invocation model is designed to allow you to integrate with AWS Stream and Queue based services with no code or server management. Lambda will poll the following services on your behalf, retrieve records, and invoke your functions. The following are supported services:

AWS will manage the poller on your behalf and perform Synchronous invokes of your function with this type of integration. The retry behavior for this model is based on data expiration in the data source. For example, Kinesis Data streams store records for 24 hours by default (up to 168 hours). The specific details of each integration are linked above.

Conclusion

In our next post, we’ll provide some tips and best practices for developing Lambda functions. Happy coding!

 

About the Author

George MaoGeorge Mao is a Specialist Solutions Architect at Amazon Web Services, focused on the Serverless platform. George is responsible for helping customers design and operate Serverless applications using services like Lambda, API Gateway, Cognito, and DynamoDB. He is a regular speaker at AWS Summits, re:Invent, and various tech events. George is a software engineer and enjoys contributing to open source projects, delivering technical presentations at technology events, and working with customers to design their applications in the Cloud. George holds a Bachelor of Computer Science and Masters of IT from Virginia Tech.