Today, AWS is introducing Synchronous Express Workflows for AWS Step Functions. This is a new way to run Express Workflows to orchestrate AWS services at high-throughput.
Developers have been using asynchronous Express Workflows since December 2019 for workloads that require higher event rates and shorter durations. Customers were looking for ways to receive an immediate response from their Express Workflows without having to write additional code or introduce additional services.
What’s new?
Synchronous Express Workflows allow developers to quickly receive the workflow response without needing to poll additional services or build a custom solution. This is useful for high-volume microservice orchestration and fast compute tasks that communicate via HTTPS.
Choose Start execution. Here you have an option to run the Express Workflow as a synchronous or asynchronous type.
Choose Synchronous and choose Start execution.
Expand Details in the results message to view the output.
Monitoring, logging and tracing
Enable logging to inspect and debug Synchronous Express Workflows. All execution history is sent to CloudWatch Logs. Use the Monitoring and Logging tabs in the Step Functions console to gain visibility into Express Workflow executions.
The Monitoring tab shows six graphs with CloudWatch metrics for Execution Errors, Execution Succeeded, Execution Duration, Billed Duration, Billed Memory, and Executions Started. The Logging tab shows recent logs and the logging configuration, with a link to CloudWatch Logs.
Enable X-Ray tracing to view trace maps and timelines of the underlying components that make up a workflow. This helps to discover performance issues, detect permission problems, and track requests made to and from other AWS services.
Creating an example workflow
The following example uses Amazon API Gateway HTTP APIs to start an Express Workflow synchronously. The workflow analyses web form submissions for negative sentiment. It generates a case reference number and saves the data in an Amazon DynamoDB table. The workflow returns the case reference number and message sentiment score.
The API endpoint is generated by an API Gateway HTTP APIs. A POST request is made to the API which invokes the workflow. It contains the contact form’s message body.
$ git clone https://github.com/aws-samples/contact-form-processing-with-synchronous-express-workflows.git
$ cd contact-form-processing-with-synchronous-express-workflows
$ sam build
$ sam deploy -g
This deploys 12 resources, including a Synchronous Express Workflow, three Lambda functions, an API Gateway HTTP API endpoint, and all the AWS Identity & Access Management (IAM) roles and permissions required for the application to run.
Note the HTTP APIs endpoint and workflow ARN outputs.
aws stepfunctions start-sync-execution \
--state-machine-arn <your-workflow-arn> \
--input "{\"message\" : \"This is bad service\"}"
The response is received once the workflow completes. It contains the workflow output (sentiment and ticketid), the executionARN, and some execution metadata.
Starting the workflow from HTTP API Gateway:
The application deploys an API Gateway HTTP API, with a Step Functions integration. This is configured in the api.yaml file. It starts the state machine with the POST body provided as the input payload.
Trigger the workflow with a POST request, using the API HTTP API endpoint generated from the deploy step. Enter the following CURL command into the terminal:
curl --location --request POST '<YOUR-HTTP-API-ENDPOINT>' \
--header 'Content-Type: application/json' \
--data-raw '{"message":" This is bad service"}'
The POST request returns a 200 status response. The output field of the response contains the sentiment results (negative) and the generated ticketId (jc4t8i).
Putting it all together
You can use this application to power a web form backend to help expedite customer complaints. In the following example, a frontend application submits form data via an AJAX POST request. The application waits for the response, and presents the user with a message appropriate to the detected sentiment, and a case reference number.
If a negative sentiment is returned in the API response, the user is informed of their case number:
Setting IAM permissions
Before a user or service can start a Synchronous Express Workflow, it must be granted permission to perform the states:StartSyncExecution API operation. This is a new state-machine level permission. Existing Express Workflows can be run synchronously once the correct IAM permissions for StartSyncExecution are granted.
Step Functions Synchronous Express Workflows allow developers to receive a workflow response without having to poll additional services. This helps developers orchestrate microservices without needing to write additional code to handle errors, retries, and run parallel tasks. They can be invoked in response to events such as HTTP requests via API Gateway, from a parent state machine, or by calling the StartSyncExecution API action.
This feature is available in all Regions where AWS Step Functions is available. View the AWS Regions table to learn more.
For more serverless learning resources, visit Serverless Land.
In a recent customer engagement, Quantiphi, Inc., a member of the Amazon Web Services Partner Network, built a solution capable of pre-processing tens of millions of PDF documents before sending them for inference by a machine learning (ML) model. While the customer’s use case—and hence the ML model—was very specific to their needs, the pipeline that does the pre-processing of documents is reusable for a wide array of document processing workloads. This post will walk you through the pre-processing pipeline architecture.
Architectural goals
Quantiphi established the following goals prior to starting:
Loose coupling to enable independent scaling of compute components, flexible selection of compute services, and agility as the customer’s requirements evolved.
Work backwards from business requirements when making decisions affecting scale and throughput and not simply because “fastest is best.” Scale components only where it makes sense and for maximum impact.
Log everything at every stage to enable troubleshooting when something goes wrong, provide a detailed audit trail, and facilitate cost optimization exercises by identifying usage and load of every compute component in the architecture.
Document ingestion
The documents are initially stored in a staging bucket in Amazon Simple Storage Service (Amazon S3). The processing pipeline is kicked off when the “trigger” Amazon Lambda function is called. This Lambda function passes parameters such as the name of the staging S3 bucket and the path(s) within the bucket which are to be processed to the “ingestion app.”
The ingestion app is a simple application that runs a web service to enable triggering a batch and lists documents from the S3 bucket path(s) received via the web service. As the app processes the list of documents, it feeds the document path, S3 bucket name, and some additional metadata to the “ingest” Amazon Simple Queue Service (Amazon SQS) queue. The ingestion app also starts the audit trail for the document by writing a record to the Amazon Aurora database. As the document moves downstream, additional records are added to the database. Records are joined together by a unique ID and assigned to each document by the ingestion app and passed along throughout the pipeline.
Chunking the documents
In order to maximize grip and control, the architecture is built to submit single-page files to the ML model. This enables correlating an inference failure to a specific page instead of a whole document (which may be many pages long). It also makes identifying the location of features within the inference results an easier task. Since the documents being processed can have varied sizes, resolutions, and page count, a big part of the pre-processing pipeline is to chunk a document up into its component pages prior to sending it for inference.
The “chunking orchestrator” app repeatedly pulls a message from the ingest queue and retrieves the document named therein from the S3 bucket. The PDF document is then classified along two metrics:
File size
Number of pages
We use these metrics to determine which chunking queue the document is sent to:
Large: Greater than 10MB in size or greater than 10 pages
Small: Less than or equal to 10MB and less than or equal to 10 pages
Single page: Less than or equal to 10MB and exactly one page
Each of these queues is serviced by an appropriately sized compute service that breaks the document down into smaller pieces, and ultimately, into individual pages.
Amazon Elastic Cloud Compute (EC2) processes large documents primarily because of the high memory footprint needed to read large, multi-gigabyte PDF files into memory. The output from these workers are smaller PDF documents that are stored in Amazon S3. The name and location of these smaller documents is submitted to the “small documents” queue.
Small documents are processed by a Lambda function that decomposes the document into single pages that are stored in Amazon S3. The name and location of these single page files is sent to the “single page” queue.
The Dead Letter Queues (DLQs) are used to hold messages from their respective size queue which are not successfully processed. If messages start landing in the DLQs, it’s an indication that there is a problem in the pipeline. For example, if messages start landing in the “small” or “single page” DLQ, it could indicate that the Lambda function processing those respective queues has reached its maximum run time.
An Amazon CloudWatch Alarm monitors the depth of each DLQ. Upon seeing DLQ activity, a notification is sent via Amazon Simple Notification Service (Amazon SNS) so an administrator can then investigate and make adjustments such as tuning the sizing thresholds to ensure the Lambda functions can finish before reaching their maximum run time.
In order to ensure no documents are left behind in the active run, there is a failsafe in the form of an Amazon EC2 worker that retrieves and processes messages from the DLQs. This failsafe app breaks a PDF all the way down into individual pages and then does image conversion.
For documents that don’t fall into a DLQ, they make it to the “single page” queue. This queue drives each page through the “image conversion” Lambda function which converts the single page file from PDF to PNG format. These PNG files are stored in Amazon S3.
Sending for inference
At this point, the documents have been chunked up and are ready for inference.
When the single-page image files land in Amazon S3, an S3 Event Notification is fired which places a message in a “converted image” SQS queue which in turn triggers the “model endpoint” Lambda function. This function calls an API endpoint on an Amazon API Gateway that is fronting the Amazon SageMaker inference endpoint. Using API Gateway with SageMaker endpoints avoided throttling during Lambda function execution due to high volumes of concurrent calls to the Amazon SageMaker API. This pattern also resulted in a 2x inference throughput speedup. The Lambda function passes the document’s S3 bucket name and path to the API which in turn passes it to the auto scaling SageMaker endpoint. The function reads the inference results that are passed back from API Gateway and stores them in Amazon Aurora.
The inference results as well as all the telemetry collected as the document was processed can be queried from the Amazon Aurora database to build reports showing number of documents processed, number of documents with failures, and number of documents with or without whatever feature(s) the ML model is trained to look for.
Summary
This architecture is able to take PDF documents that range in size from single page up to thousands of pages or gigabytes in size, pre-process them into single page image files, and then send them for inference by a machine learning model. Once triggered, the pipeline is completely automated and is able to scale to tens of millions of pages per batch.
In keeping with the architectural goals of the project, Amazon SQS is used throughout in order to build a loosely coupled system which promotes agility, scalability, and resiliency. Loose coupling also enables a high degree of grip and control over the system making it easier to respond to changes in business needs as well as focusing tuning efforts for maximum impact. And with every compute component logging everything it does, the system provides a high degree of auditability and introspection which facilitates performance monitoring, and detailed cost optimization.
This series of blog posts uses the AWS Well-Architected Tool with the Serverless Lens to help customers build and operate applications using best practices. In each post, I address the nine serverless-specific questions identified by the Serverless Lens along with the recommended best practices. See the Introduction post for a table of contents and explaining the example application.
Question OPS1: How do you evaluate your serverless application’s health?
This post continues part 1 of this Operational Excellence question. Previously, I covered using Amazon CloudWatch out-of-the-box standard metrics and alerts, and structured and centralized logging with the Embedded Metrics Format.
Best practice: Use application, business, and operations metrics
Identifying key performance indicators (KPIs), including business, customer, and operations outcomes, in additional to application metrics, helps to show a higher-level view of how the application and business is performing.
Business KPIs measure your application performance against business goals. For example, if fewer flight reservations are flowing through the system, the business would like to know.
Customer experience KPIs highlight the overall effectiveness of how customers use the application over time. Examples are perceived latency, time to find a booking, make a payment, etc.
Operational metrics help to see how operationally stable the application is over time. Examples are continuous integration/delivery/deployment feedback time, mean-time-between-failure/recovery, number of on-call pages and time to resolution, etc.
Custom Metrics
Embedded Metrics Format can also emit custom metrics to help understand your workload health’s impact on business.
And within the except statement within the try/except block, I emit a metric for a failed booking:
metrics.put_metric("BookingFailed", 1, "Count")
Once I make a booking, within the CloudWatch console, I navigate to Logs | Log groups and select the Airline-ConfirmBooking-develop group. I select a log stream and find the custom metric as part of the structured log entry.
Custom metric structured log entry
I can also create a custom metric graph. Within the CloudWatch console, I navigate to Metrics. I see the ServerlessAirlineEMF Custom Namespace is available.
Custom metric namespace
I select the Namespace and the available metric.
Available metric
I select a Line graph, add a name, and see the successful booking now plotted on the graph.
Once a booking is made, within the CloudWatch console, I navigate to Logs | Insights. I select the /aws/lambda/Airline-ConfirmBooking-develop log group. I choose Run query which shows a list of discovered fields on the right of the console.
I can search for discovered booking related fields.
I then enter the following in the query pane to search the logs and plot the sum of BookingReference and choose Run Query:
There are a number of other component metrics that are important to measure. Track interactions between upstream and downstream components such as message queue length, integration latency, and throttling.
Improvement plan summary:
Identify user journeys and metrics from each customer transaction.
Create custom metrics asynchronously instead of synchronously for improved performance, cost, and reliability outcomes.
Emit business metrics from within your workload to measure application performance against business goals.
Create and analyze component metrics to measure interactions with upstream and downstream components.
Create and analyze operational metrics to assess the health of your continuous delivery pipeline and operational processes.
Good practice: Use distributed tracing and code is instrumented with additional context
Logging provides information on the individual point in time events the application generates. Tracing provides a wider continuous view of an application. Tracing helps to follow a user journey or transaction through the application.
AWS X-Ray is an example of a distributed tracing solution, there are a number of third-party options as well. X-Ray collects data about the requests that your application serves and also builds a service graph, which shows all the components of an application. This provides visibility to understand how upstream/downstream services may affect workload health.
For the most comprehensive view, enable X-Ray across as many services as possible and include X-Ray tracing instrumentation in code. This is the list of AWS Services integrated with X-Ray.
X-Ray receives data from services as segments. Segments for a common request are then grouped into traces. Segments can be further broken down into more granular subsegments. Custom data key-value pairs are added to segments and subsegments with annotations and metadata. Traces can also be grouped which helps with filter expressions.
AWS Lambda instruments incoming requests for all supported languages. Lambda application code can be further instrumented to emit information about its status, correlation identifiers, and business outcomes to determine transaction flows across your workload.
X-Ray tracing for a Lambda function is enabled in the Lambda Console. Select a Lambda function. I select the Airline-ReserveBooking-develop function. In the Configuration pane, I select to enable X-Ray Active tracing.
X-Ray tracing enabled
X-Ray can also be enabled via CloudFormation with the following code:
TracingConfig:
Mode: Active
Lambda IAM permissions to write to X-Ray are added automatically when active tracing is enabled via the console. When using CloudFormation, Allow the Actions xray:PutTraceSegments and xray:PutTelemetryRecords
It is important to understand what invocations X-Ray does trace. X-Ray applies a sampling algorithm. If an upstream service, such as API Gateway with X-Ray tracing enabled, has already sampled a request, the Lambda function request is also sampled. Without an upstream request, X-Ray traces data for the first Lambda invocation each second, and then 5% of additional invocations.
For the airline application, X-Ray tracing is initiated within the shared library with the code:
from aws_xray_sdk.core import models, patch_all, xray_recorder
Segments, subsegments, annotations, and metadata are added to functions with the following example code:
segment = xray_recorder.begin_segment('segment_name')
# Start a subsegment
subsegment = xray_recorder.begin_subsegment('subsegment_name')
# Add metadata and annotations
segment.put_metadata('key', dict, 'namespace')
subsegment.put_annotation('key', 'value')
# Close the subsegment and segment
xray_recorder.end_subsegment()
xray_recorder.end_segment()
For example, within the collect payment module, an annotation is added for a successful payment with:
tracer.put_annotation("PaymentStatus", "SUCCESS")
CloudWatch ServiceLens
Once a booking is made and payment is successful, the tracing is available in the X-Ray console.
I can visualize all application resources and dependencies where X-Ray is enabled. I can trace performance or availability issues. If there was an issue connecting to SNS for example, this would be shown.
I select the Airline-CollectPayment-develop node and can view the out-of-the-box standard Lambda metrics.
I can select View Logs to jump to the CloudWatch Logs Insights console.
CloudWatch Insights Service map
I select View dashboard to see the function metrics, node map, and function details.
CloudWatch Insights Service Map dashboard
I select View traces and can filter by the custom metric PaymentStatus. I select SUCCESS and chose Add to filter. I then select a trace.
CloudWatch Insights Filtered traces
I see the full trace details, which show the full application transaction of a payment collection.
Segments timeline
Selecting the Lambda handler subsegment – ## lambda_handler, I can view the trace Annotations and Metadata, which include the business transaction details such as Customer and PaymentStatus.
X-Ray annotations
Trace groups are another feature of X-Ray and ServiceLens. Trace groups use filter expressions such as Annotation.PaymentStatus = "FAILED" which are used to view traces that match the particular group. Service graphs can also be viewed, and CloudWatch alarms created based on the group.
CloudWatch ServiceLens provides powerful capabilities to understand application performance bottlenecks and issues, helping determine how users are impacted.
Improvement plan summary:
Identify common business context and system data that are commonly present across multiple transactions.
Instrument SDKs and requests to upstream/downstream services to understand the flow of a transaction across system components.
Recent announcements
There have been a number of recent announcements for X-Ray and CloudWatch to improve how to evaluate serverless application health.
Evaluating application health helps you identify which services should be optimized to improve your customer’s experience. In part 1, I cover out-of-the-box standard metrics and alerts, as well as structured and centralized logging. In this post, I explore custom metrics and distributed tracing and show how to use ServiceLens to view logs, metrics, and traces together.
In an upcoming post, I will cover the next Operational Excellence question from the Well-Architected Serverless Lens – Approaching application lifecycle management.
This series of blog posts uses the AWS Well-Architected Tool with the Serverless Lens to help customers build and operate applications using best practices. In each post, I address the nine serverless-specific questions identified by the Serverless Lens along with the recommended best practices. See the Introduction post for a table of contents and explaining the example application.
Question OPS1: How do you evaluate your serverless application’s health?
Evaluating your metrics, distributed tracing, and logging gives you insight into business and operational events, and helps you understand which services should be optimized to improve your customer’s experience. By understanding the health of your Serverless Application, you will know whether it is functioning as expected, and can proactively react to any signals that indicate it is becoming unhealthy.
Required practice: Understand, analyze, and alert on metrics provided out of the box
It is important to understand metrics for every AWS service used in your application so you can decide how to measure its behavior. AWS services provide a number of out-of-the-box standard metrics to help monitor the operational health of your application.
As these metrics are generated automatically, it is a simple way to start monitoring your application and can also be augmented with custom metrics.
When I make a booking, as shown in the Introduction post, AWS services emit metrics to Amazon CloudWatch. These are processed asynchronously without impacting the application’s performance.
There are two default CloudWatch dashboards to visualize key metrics quickly: per service and cross service.
Per service
To view the per service metrics dashboard, I open the CloudWatch console.
I select a service where Overview is shown, such as Lambda. Now I can view the metrics for all Lambda functions in the account.
Cross service
To see an overview of key metrics across all AWS services, open the CloudWatch console and choose View cross service dashboard.
I see a list of all services with one or two key metrics displayed. This provides a good overview of all services your application uses.
Alerting
The next stage is to identify the key metrics for comparison and set up alerts for under- and over-performing services. Here are some recommended metrics to alarm on for a number of AWS services.
To configure a manual alert for Lambda function errors using CloudWatch Alarms:
I open the CloudWatch console and select Alarms and select Create Alarm.
I choose Select Metric and from AWS Namespaces, select Lambda, Across All Functions and select Errors and select Select metric.
I change the Statistic to Sum and the Period to 1 minute.
Under Conditions, I select a Static threshold Greater than 1 and select Next.
Alarms can also be created using anomaly detection rather than static values if there is a discernible pattern or trend. Anomaly detection looks at past metric data and uses machine learning to create a model of expected values. Alerts can then be configured if they fall outside this band of “normal” values. I use a Static threshold for this alarm.
For the notification, I set the trigger to alarm to an existing SNS topic with my email address, then choose Next.
I enter a descriptive alarm name such as serverlessairline-lambda-prod-errors > 1, select Next, and choose Create alarm.
I have now manually set up an alarm.
Use CloudWatch composite alarms to combine multiple alarms to reduce noise and focus on critical issues. For example, a single alarm could trigger if there are both Lambda function errors as well as high Lambda concurrent executions.
I view the out of the box standard metrics and in this example, manually create an alarm for Lambda function errors.
Improvement plan summary:
Understand what metrics and dimensions each managed service used provides.
Configure alerts on relevant metrics for when services are unhealthy.
Good practice: Use structured and centralized logging
Central logging provides a single place to search and analyze logs. Structured logging means selecting a consistent log format and content structure to simplify querying across multiple components.
To identify a business transaction across components, such as a particular flight booking, log operational information from upstream and downstream services. Add information such as customer_id along with business outcomes such as order=accepted or order=confirmed. Make sure you are not logging any sensitive or personal identifying data in any logs.
Use JSON as your logging output format. Log multiple fields in a single object or dictionary rather than many one line messages for simpler searching.
The airline booking component, which is written in Python, currently uses a shared library with a separate log processing stack.
Embedded Metrics Format is a simpler mechanism to replace the shared library and use structured logging. CloudWatch Embedded Metrics adds environmental metadata such as Lambda Function version and also automatically extracts custom metrics so you can visualize and alarm on them. There are open-source client libraries available for Node.js and Python.
I then add embedded metrics to the individual confirm booking module with the following steps:
In this example I also log the entire incoming event details.
metrics.set_property("event", event)
It is best practice to only log what is required to avoid unnecessary costs. Ensure the event does not have any sensitive or personal identifying data which is available to anyone who has access to the logs.
To avoid the duplicate logging in this example airline application which adds cost, I remove the existing shared library logger.*() lines.
When I make a booking, the CloudWatch log message is in structured JSON format. It contains the properties I set event, BookingReference, as well as function metadata.
I can then search for all log activity related to a specific booking across multiple functions with booking_id. I can track customer activity across multiple bookings using customer_id.
Logging is often created as a shared library resource which all functions reference. Another option is using Lambda Layers, which lets functions import additional code such as external libraries. Multiple functions can share this code.
Improvement plan summary:
Log request identifiers from downstream services, component name, component runtime information, unique correlation identifiers, and information that helps identify a business transaction.
Use JSON as the logging output. Prefer logging entire objects/dictionaries rather than many one line messages. Mask or remove sensitive data when logging.
Minimize logging debugging information to a minimum as they can incur both costs and increase noise to signal ratio
Conclusion
Evaluating serverless application health helps understand which services should be optimized to improve your customer’s experience. I cover out of the box metrics and alerts, as well as structured and centralized logging.
This well-architected question will be continued in an upcoming post where I look at custom metrics and distributed tracing.
You can now develop your AWS Lambda functions using Ruby 2.7. Start using this runtime today by specifying a runtime parameter value of ruby2.7 when creating or updating Lambda functions.
New Ruby runtime features
Ruby 2.7 is a stable release and brings several new features, including pattern matching, argument forwarding, and numbered arguments.
Pattern matching
Pattern matching, a widely used feature in functional programming languages, is introduced as a new experimental feature. This allows deep matching of structured values, checking the structure and binding the matched parts to local variables.
It can traverse a given object and assign it’s value if it matches a pattern:
require "json"
json = <<END
{
"name": "Alice",
"age": 30,
"children": [{ "name": "Bob", "age": 2 }]
}
END
case JSON.parse(json, symbolize_names: true)
in {name: "Alice", children: [{name: "Bob", age: age}]}
p age #=> 2
end
For additional information on pattern matching, see Feature #14912.
Argument forwarding
Prior to Ruby 2.7 the * and ** operators are available for single and keyword arguments. These are used to specify any number of arguments or convert array or hashes to several arguments.
Ruby 2.7 added a new shorthand syntax ... for forwarding all arguments to a method irrespective of type. In the example below, all arguments to foo are forwarded to bar, including keyword and block arguments. It acts similar to calling super without any arguments.
def foo(...)
bar(...)
end
Numbered arguments
Numbered arguments allow you to reference block arguments solely by their index. They are only valid when referenced inside of a block:
Before:
[1, 2, 3].each { |i| puts i }
After:
[1, 2, 3].each { puts @1 }
This can make short code blocks easier to read and reduce code repetition.
Amazon Linux 2
Ruby 2.7, like (Python 3.8, Node.js 10 and 12, and Java 11) is based on an Amazon Linux 2 execution environment. Amazon Linux 2 provides a secure, stable, and high-performance execution environment to develop and run cloud and enterprise applications.
Next steps
Get started building with Ruby 2.7 today by specifying a runtime parameter value of ruby2.7 when creating your Lambda functions. You can read about the Ruby programming model in the AWS Lambda documentation to learn more about writing functions in Ruby 2.7.
For existing Ruby functions, migrate to the new runtime by making any necessary changes to the code for compatibility with Ruby 2.7, then changing the function’s runtime configuration to ruby2.7.
Using a simple Arduino sketch, an AWS Serverless Application Repository application, and a microcontroller, you can build a basic serverless workflow for communicating with an AWS IoT Core device.
A microcontroller is a programmable chip and acts as the brain of an electronic device. It has input and output pins for reading and writing on digital or analog components. Those components could be sensors, relays, actuators, or various other devices. It can be used to build remote sensors, home automation products, robots, and much more. The ESP32 is a powerful low-cost microcontroller with Wi-Fi and Bluetooth built in and is used this walkthrough.
The Arduino IDE, a lightweight development environment for hardware, now includes support for the ESP32. There is a large collection of community and officially supported libraries, from addressable LED strips to spectral light analysis.
The following walkthrough demonstrates connecting an ESP32 to AWS IoT Core to allow it to publish and subscribe to topics. This means that the device can send any arbitrary information, such as sensor values, into AWS IoT Core while also being able to receive commands.
Solution overview
This post walks through deploying an application from the AWS Serverless Application Repository. This allows an AWS IoT device to be messaged using a REST endpoint powered by Amazon API Gateway and AWS Lambda. The AWS SAR application also configures an AWS IoT rule that forwards any messages published by the device to a Lambda function that updates an Amazon DynamoDB table, demonstrating basic bidirectional communication.
The last section explores how to build an IoT project with real-world application. By connecting a thermal printer module and modifying a few lines of code in the example firmware, the ESP32 device becomes an AWS IoT–connected printer.
All of this can be accomplished within the AWS Free Tier, which is necessary for the following instructions.
An example of an AWS IoT project using an ESP32, AWS IoT Core, and an Arduino thermal printer.
Required steps
To complete the walkthrough, follow these steps:
Create an AWS IoT device.
Install and configure the Arduino IDE.
Configure and flash an ESP32 IoT device.
Deploying the lambda-iot-rule AWS SAR application.
Monitor and test.
Create an IoT thermal printer.
Creating an AWS IoT device
To communicate with the ESP32 device, it must connect to AWS IoT Core with device credentials. You must also specify the topics it has permissions to publish and subscribe on.
In the AWS IoT console, choose Register a new thing, Create a single thing.
Name the new thing. Use this exact name later when configuring the ESP32 IoT device. Leave the remaining fields set to their defaults. Choose Next.
Choose Create certificate. Only the thing cert, private key, and Amazon Root CA 1 downloads are necessary for the ESP32 to connect. Download and save them somewhere secure, as they are used when programming the ESP32 device.
Choose Activate, Attach a policy.
Skip adding a policy, and choose Register Thing.
In the AWS IoT console side menu, choose Secure, Policies, Create a policy.
Name the policy Esp32Policy. Choose the Advanced tab.
Replace REGION with the matching AWS Region you’re currently operating in. This can be found on the top right corner of the AWS console window.
Replace ACCOUNT_ID with your own, which can be found in Account Settings.
Replace THINGNAME with the name of your device.
Choose Create.
In the AWS IoT console, choose Secure, Certification. Select the one created for your device and choose Actions,Attach policy.
Choose Esp32Policy, Attach.
Your AWS IoT device is now configured to have permission to connect to AWS IoT Core. It can also publish to the topic esp32/pub and subscribe to the topic esp32/sub. For more information on securing devices, see AWS IoT Policies.
Installing and configuring the Arduino IDE
The Arduino IDE is an open-source development environment for programming microcontrollers. It supports a continuously growing number of platforms including most ESP32-based modules. It must be installed along with the ESP32 board definitions, MQTT library, and ArduinoJson library.
For Additional Board Manager URLs, add https://dl.espressif.com/dl/package_esp32_index.json.
Choose Tools, Board, Boards Manager.
Search esp32 and install the latest version.
Choose Sketch, Include Library, Manage Libraries.
Search MQTT, and install the latest version by Joel Gaehwiler.
Repeat the library installation process for ArduinoJson.
The Arduino IDE is now installed and configured with all the board definitions and libraries needed for this walkthrough.
Configuring and flashing an ESP32 IoT device
A collection of various ESP32 development boards.
For this section, you need an ESP32 device. To check if your board is compatible with the Arduino IDE, see the boards.txt file. The following code connects to AWS IoT Core securely using MQTT, a publish and subscribe messaging protocol.
This project has been tested on the following devices:
Install the required serial drivers for your device. Some boards use different USB/FTDI chips for interfacing. Here are the most commonly used with links to drivers.
Enter the name of your AWS IoT thing, as it is in the console, in the field THINGNAME.
To connect to Wi-Fi, add the SSID and PASSWORD of the desired network. Note: The network name should not include spaces or special characters.
The AWS_IOT_ENDPOINT can be found from the Settings page in the AWS IoT console.
Copy the Amazon Root CA 1, Device Certificate, and Device Private Key to their respective locations in the secrets.h file.
Choose the tab for the main sketch file, and paste the following.
#include "secrets.h"
#include <WiFiClientSecure.h>
#include <MQTTClient.h>
#include <ArduinoJson.h>
#include "WiFi.h"
// The MQTT topics that this device should publish/subscribe
#define AWS_IOT_PUBLISH_TOPIC "esp32/pub"
#define AWS_IOT_SUBSCRIBE_TOPIC "esp32/sub"
WiFiClientSecure net = WiFiClientSecure();
MQTTClient client = MQTTClient(256);
void connectAWS()
{
WiFi.mode(WIFI_STA);
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
Serial.println("Connecting to Wi-Fi");
while (WiFi.status() != WL_CONNECTED){
delay(500);
Serial.print(".");
}
// Configure WiFiClientSecure to use the AWS IoT device credentials
net.setCACert(AWS_CERT_CA);
net.setCertificate(AWS_CERT_CRT);
net.setPrivateKey(AWS_CERT_PRIVATE);
// Connect to the MQTT broker on the AWS endpoint we defined earlier
client.begin(AWS_IOT_ENDPOINT, 8883, net);
// Create a message handler
client.onMessage(messageHandler);
Serial.print("Connecting to AWS IOT");
while (!client.connect(THINGNAME)) {
Serial.print(".");
delay(100);
}
if(!client.connected()){
Serial.println("AWS IoT Timeout!");
return;
}
// Subscribe to a topic
client.subscribe(AWS_IOT_SUBSCRIBE_TOPIC);
Serial.println("AWS IoT Connected!");
}
void publishMessage()
{
StaticJsonDocument<200> doc;
doc["time"] = millis();
doc["sensor_a0"] = analogRead(0);
char jsonBuffer[512];
serializeJson(doc, jsonBuffer); // print to client
client.publish(AWS_IOT_PUBLISH_TOPIC, jsonBuffer);
}
void messageHandler(String &topic, String &payload) {
Serial.println("incoming: " + topic + " - " + payload);
// StaticJsonDocument<200> doc;
// deserializeJson(doc, payload);
// const char* message = doc["message"];
}
void setup() {
Serial.begin(9600);
connectAWS();
}
void loop() {
publishMessage();
client.loop();
delay(1000);
}
Choose File, Save, and give your project a name.
Flashing the ESP32
Plug the ESP32 board into a USB port on the computer running the Arduino IDE.
Choose Tools, Board, and then select the matching type of ESP32 module. In this case, a Sparkfun ESP32 Thing was used.
Choose Tools, Port, and then select the matching port for your device.
Choose Upload. Arduino reads Done uploading when the upload is successful.
Choose the magnifying lens icon to open the Serial Monitor. Set the baud rate to 9600.
Keep the Serial Monitor open. When connected to Wi-Fi and then AWS IoT Core, any messages received on the topic esp32/sub are logged to this console. The device is also now publishing to the topic esp32/pub.
// The MQTT topics that this device should publish/subscribe
#define AWS_IOT_PUBLISH_TOPIC "esp32/pub"
#define AWS_IOT_SUBSCRIBE_TOPIC "esp32/sub"
Within this sketch, the relevant functions are publishMessage() and messageHandler().
The publishMessage() function creates a JSON object with the current time in milliseconds and the analog value of pin A0 on the device. It then publishes this JSON object to the topic esp32/pub.
The messageHandler() function prints out the topic and payload of any message from a subscribed topic. To see all the ways to parse JSON messages in Arduino, see the deserializeJson() example.
Additional topic subscriptions can be added within the connectAWS() function by adding another line similar to the following.
// Subscribe to a topic
client.subscribe(AWS_IOT_SUBSCRIBE_TOPIC);
Serial.println("AWS IoT Connected!");
Deploying the lambda-iot-rule AWS SAR application
Now that an ESP32 device has been connected to AWS IoT, the following steps walk through deploying an AWS Serverless Application Repository application. This is a base for building serverless integration with a physical device.
On the lambda-iot-rule AWS Serverless Application Repository application page, make sure that the Region is the same as the AWS IoT device.
Choose Deploy.
Under Application settings, for PublishTopic, enter esp32/sub. This is the topic to which the ESP32 device is subscribed. It receives messages published to this topic. Likewise, set SubscribeTopic to esp32/pub, the topic on which the device publishes.
Choose Deploy.
When creation of the application is complete, choose Test app to navigate to the application page. Keep this page open for the next section.
Monitoring and testing
At this stage, two Lambda functions, a DynamoDB table, and an AWS IoT rule have been deployed. The IoT rule forwards messages on topic esp32/pub to TopicSubscriber, a Lambda function, which inserts the messages on to the DynamoDB table.
On the application page, under Resources, choose MyTable. This is the DynamoDB table that the TopicSubscriber Lambda function updates.
Choose Items. If the ESP32 device is still active and connected, messages that it has published appear here.
The TopicPublisher Lambda function is invoked by the API Gateway endpoint and publishes to the AWS IoT topic esp32/sub.
1. On the application page, find the Application endpoint.
2. To test that the TopicPublisher function is working, enter the following into a terminal or command-line utility, replacing ENDPOINT with the URL from above.
curl -d '{"text":"Hello world!"}' -H "Content-Type: application/json" -X POST https://ENDPOINT/publish
Upon success, the request returns a copy of the message.
Back in the Serial Monitor, the message published to the topic esp32/sub prints out.
Creating an IoT thermal printer
With the completion of the previous steps, the ESP32 device currently logs incoming messages to the serial console.
The following steps demonstrate how the code can be modified to use incoming messages to interact with a peripheral component. This is done by wiring a thermal printer to the ESP32 in order to physically print messages. The REST endpoint from the previous section can be used as a webhook in third-party applications to interact with this device.
A wiring diagram depicting an ESP32 connected to a thermal printer.
Follow the product instructions for powering, wiring, and installing the correct Arduino library.
Ensure that the thermal printer is working by holding the power button on the printer while connecting the power. A sample receipt prints. On that receipt, the default baud rate is specified as either 9600 or 19200.
In the Arduino code from earlier, include the following lines at the top of the main sketch file. The second line defines what interface the thermal printer is connected to. &Serial2 is used to set the third hardware serial interface on the ESP32. For this example, the pins on the Sparkfun ESP32 Thing, GPIO16/GPIO17, are used for RX/TX respectively.
Replace the setup() function with the following to initialize the printer on device bootup. Change the baud rate of Serial2.begin() to match what is specified in the test print. The default is 19200.
After the firmware has successfully uploaded, open the Serial Monitor to confirm that the board has connected to AWS IoT.
Enter the following into a command-line utility, replacing ENDPOINT, as in the previous section. curl -d '{"message": "Hello World!"}' -H "Content-Type: application/json" -X POST https://ENDPOINT/publish
If successful, the device prints out the message “Hello World” from the attached thermal printer. This is a fully serverless IoT printer that can be triggered remotely from a webhook. As an example, this can be used with GitHub Webhooks to print a physical readout of events.
Conclusion
Using a simple Arduino sketch, an AWS Serverless Application Repository application, and a microcontroller, this post demonstrated how to build a basic serverless workflow for communicating with a physical device. It also showed how to expand that into an IoT thermal printer with real-world applications.
With the use of AWS serverless, advanced compute and extensibility can be added to an IoT device, from machine learning to translation services and beyond. By using the Arduino programming environment, the vast collection of open-source libraries, projects, and code examples open up a world of possibilities. The next step is to explore what can be done with an Arduino and the capabilities of AWS serverless. The sample Arduino code for this project and more can be found at this GitHub repository.
The AWS Serverless Application Model (SAM) CLI provides developers with a local tool for managing serverless applications on AWS. The command line tool allows developers to initialize and configure applications, debug locally using IDEs like Visual Studio Code or JetBrains WebStorm, and deploy to the AWS Cloud.
On November 25, we announced improvements to the deployment process using the SAM CLI. These improvements allow users to deploy serverless applications with less manual setup, fewer repeated steps, and shorter CLI commands.
Previously, developers had to manually create and manage an Amazon S3 bucket to host deployment artifacts for each desired Region. With this latest release, the SAM CLI automatically creates a Region-specific bucket via AWS CloudFormation, based on your local AWS credentials. If you deploy an application to a Region where no bucket exists, a new managed bucket is created in the new Region.
Minimized deployment commands
Before this update, a minimal deployment process would look like this:
sam package --s3-bucket my-regional-bucket --output-template-file out.yaml
sam deploy --template-file out.yaml --capabilities CAPABILITY_IAM --stack-name MyStackName
This series of commands was required at every deployment. With this latest update to SAM CLI, the package and deployment commands have been combined. The syntax is now:
sam deploy
The guided deployment
How does SAM CLI know where to deploy and what to name the application? The answer to this is found in the “guided deployment.” This is an interactive version of the deployment process that collects and saves information needed to deploy the application.
If sam deploy is running and cannot find the required information for deployment, the process errors out, recommending that the guided deployment process be run. To use the guided process:
sam deploy -g or --guided
Once the information is collected, it is saved in the application as the samconfig.toml file. Subsequent calls to sam deploy use the existing data to deploy. If you update a setting between deployments, run the sam deploy -g command again to update the stored values.
Frequently asked questions
How many buckets are created?
When you run the sam deploy -g command with provided values, SAM checks the account for an existing SAM deployment bucket in that Region. This Regional bucket is created via CloudFormation by SAM as an artifact repository for all applications for the current account in the current Region. For a root level account, there is only a single bucket per Region that contains deployed SAM serverless applications.
What if the Region is changed for the application?
If you change the Region in samconfig.toml before running sam deploy, the process errors out. The selected deployment Region does not match the artifacts bucket Region stored in the samconfig.toml file. The error also occurs if you use the –region flag, and a Region is different to the Region in the samconfig.toml file. To change the Region for a deployment, use the sam deploy -g option to update the Region. SAM verifies that a bucket for the new Region exists, or creates one automatically.
What if the samconfig.toml file is deleted?
If the samconfig.toml file is deleted, SAM treats the application as new. We recommend that you use the -g flag to reconfigure the application.
What about backwards compatibility?
If you are using SAM for a non-interactive deployment, it is possible to pass all required information as parameters. For example, for a continuous integration continuous delivery (CICD) pipeline:
This same deployment is achieved using the older process with the following commands:
sam package --s3-bucket aws-sam-cli-managed-default-samclisourcebucket-xic3fipuh9n9 --output-template-file out.yaml
sam deploy --template-file out.yaml --capabilities CAPABILITY_IAM --stack-name sam-app --region us-west-2
The package command still exists in the latest version of SAM CLI for backwards compatibility with existing CICD processes.
Updated user experience
Along a streamlined process for deploying applications, the new version of SAM CLI brings an improved user interface. This provides developers with more feedback and validation choices. First, during the deployment process, all deployment parameters are displayed:
Once the changeset is created, the developer is presented with all the proposed changes.
Developers also have the option to confirm the changes, or cancel the deployment. This option is a setting in the samconfig.toml file that can be turned on or off as needed.
As the changeset is applied, the console displays the changes being made in the AWS Cloud.
Finally, the resulting output is displayed.
Conclusion
By streamlining the deployment process, removing the need to manage an S3 bucket, and providing clear deployment feedback and data, the latest version SAM CLI makes serverless development easier for developers.
In North America, approximately 95% of adults over the age of 25 have a bank account. In the developing world, that number is only about 52%. Cryptocurrencies can provide a platform for millions of unbanked people in the world to achieve financial freedom on a more level financial playing field.
Electroneum, a cryptocurrency company located in England, built its cryptocurrency mobile back end on AWS and is using the power of blockchain to unlock the global digital economy for millions of people in the developing world.
Electroneum’s cryptocurrency mobile app allows Electroneum customers in developing countries to transfer ETNs (exchange-traded notes) and pay for goods using their smartphones. Listen in to the discussion between AWS Solutions Architect Toby Knight and Electroneum CTO Barry Last as they explain how the company built its solution. Electroneum’s app is a web application that uses a feedback loop between its web servers and AWS WAF (a web application firewall) to automatically block malicious actors. The system then uses Athena, with a gamified approach, to provide an additional layer of blocking to prevent DDoS attacks. Finally, Electroneum built a serverless, instant payments system using AWS API Gateway, AWS Lambda, and Amazon DynamoDB to help its customers avoid the usual delays in confirming cryptocurrency transactions.
Assume that you make snapshot copies or read-replicas of your RDS databases in a secondary or backup AWS Region as a best practice. By using AWS Secrets Manager, you can store your RDS database credentials securely using AWS KMS Customer Master Keys, otherwise known as CMKs. AWS Key Management Service (AWS KMS) ensures secrets are encrypted at rest. With the integration of AWS Lambda, you can now more easily rotate these credentials regularly and replicate them for disaster recovery situations. This automation keeps credentials stored in AWS Secrets Manager for Amazon Relational Database Service (Amazon RDS) in sync between the origin Region, where your AWS RDS database lives, and the replica Region where your read-replicas live. While using the same credentials for all databases is not ideal, in the instance of disaster recovery, it can be useful for a quicker recovery.
In this post, I show you how to set up secret replication using an AWS CloudFormation template to create an AWS Lambda Function. By replicating secrets across AWS Regions, you can reduce the time required to get back up and running in production in a disaster recovery situation by ensuring your credentials are securely stored in the replica Region as well.
Solution overview
The solution described in this post uses a combination of AWS Secrets Manager, AWS CloudTrail, Amazon CloudWatch Events, and AWS Lambda. You create a secret in Secrets Manager that houses your RDS database credentials. This secret is encrypted using AWS KMS. Lambda automates the replication of the secret’s value in your origin AWS Region by performing a PUT operation on a secret of the same name in the same AWS Region as your read-replica. CloudWatch Events ensures that each time the secret housing your AWS RDS database credentials is rotated, it triggers the Lambda function to copy the secret’s value to your read-replica Region. By doing this, your RDS database credentials are always in sync for recovery.
Note: You might incur charges for using the services used in this solution, including Lambda. For information about potential costs, see the AWS pricing page.
The following diagram illustrates the process covered in this post.
Secrets Manager rotates a secret in your original AWS Region.
CloudTrail receives a log with “eventName”: “RotationSuceeded”.
CloudTrail passes this log to CloudWatch Events.
A filter in CloudWatch Events for this EventName triggers a Lambda function.
The Lambda function retrieves the secret value from the origin AWS Region.
The Lambda function then performs PutSecretValue on a secret with the same name in the replica AWS Region.
The Lambda function is triggered by a CloudWatch Event passed by CloudTrail. The triggering event is raised whenever a secret successfully rotates, which creates a CloudTrail log with the EventName property set to RotationSucceeded. You will know the secret rotation was successful when it has the label AWSCURRENT. You can read more about the secret labels and how they change during the rotation process here. The Lambda function retrieves the new secret, then calls PutSecretValue on a secret with the same name in the replica AWS Region. This AWS Region is specified by an environment variable inside the Lambda function.
Note: If the origin secret uses a customer-managed Customer Master Key (CMK), then the cloned secret must, as well. If the origin secret uses an AWS-managed CMK, then the cloned secret must, as well. You can’t mix them or the Lambda function will fail. AWS recommends you use customer-managed CMKs because you have full control of the permissions regarding which entities can use the CMK.
The CloudFormation template also creates an AWS Identity and Access Management (IAM) role with the required permissions to create and update secret replicas. Next, you’ll launch the CloudFormation template by using the AWS CloudFormation CLI.
Deploy the solution
Now that you understand how the Lambda function copies your secrets to the replica AWS Region, I’ll explain the commands to launch your CloudFormation stack. This stack creates the necessary resources to perform the automation. It includes the Lambda function, an IAM role, and the CloudWatch Event trigger.
First, make sure you have credentials for an IAM user or role that can launch all resources included in this template configured on your CLI. To launch the template, run the command below. You’ll choose a unique Stack Name to easily identify its purpose and the URL of the provided template you uploaded to your own S3 Bucket. For the following examples, I will use US-EAST-1 as my origin Region, and EU-WEST-1 as my replica Region. Make sure to replace these values and the other variables (identified by red font) with actual values from your own account.
Verify that the StackStatus shows CREATE_COMPLETE. After you verify this, you’ll see three resources created in your account. The first is the IAM role, which allows the Lambda function to perform its API calls. The name of this role is SecretsManagerRegionReplicatorRole, and can be found in the IAM console under Roles. There are two policies attached to this role. The first policy is the managed permissions policy AWSLambdaBasicExecutionRole, which grants permissions for the Lambda function to write to AWS CloudWatch Logs. These logs will be used further on in the Event Rule creation, which will trigger the cloning of the origin secret to the replication region.
The second policy attached to the SecretsManagerRegionReplicatorRole role is an inline policy that grants permissions to decrypt and encrypt the secret in both your original AWS Region and in the replica AWS Region. This policy also grants permissions to retrieve the secret from the original AWS Region, and to store the secret in the replica AWS Region. You can see an example of this policy granting access to specific secrets below. Should you choose to use this policy, please remember to place your parameters into the placeholder values.
The next resource created is the CloudWatch Events rule SecretsManagerCrossRegionReplicator. You can find this rule by going to the AWS CloudWatch console, and, under Events, selecting Rules. Here’s an example Rule for the EventName I described earlier that’s used to trigger the Lambda function:
The last resource created by the CloudFormation template is the Lambda function, which will do most of the actual work for the replication. After it’s created, you’ll be able to find this resource in your Lambda console with the name SecretsManagerRegionReplicator. You can download a copy of the Python script here, and you can see the full script below. In the function’s environment variables, you’ll also notice the parameter names and values you entered when launching your stack.
import boto3
from os import environ
targetRegion = environ.get('TargetRegion')
if targetRegion == None:
raise Exception('Environment variable "TargetRegion" must be set')
smSource = boto3.client('secretsmanager')
smTarget = boto3.client('secretsmanager', region_name=targetRegion)
def lambda_handler(event, context):
detail = event['detail']
print('Retrieving SecretArn from event data')
secretArn = detail['additionalEventData']['SecretId']
print('Retrieving new version of Secret "{0}"'.format(secretArn))
newSecret = smSource.get_secret_value(SecretId = secretArn)
secretName = newSecret['Name']
currentVersion = newSecret['VersionId']
replicaSecretExists = True
print('Replicating secret "{0}" (Version {1}) to region "{2}"'.format(secretName, currentVersion, targetRegion))
try:
smTarget.put_secret_value(
SecretId = secretName,
ClientRequestToken = currentVersion,
SecretString = newSecret['SecretString']
)
pass
except smTarget.exceptions.ResourceNotFoundException:
print('Secret "{0}" does not exist in target region "{1}". Creating it now with default values'.format(secretName, targetRegion))
replicaSecretExists = False
except smTarget.exceptions.ResourceExistsException:
print('Secret version "{0}" has already been created, this must be a duplicate invocation'.format(currentVersion))
pass
if replicaSecretExists == False:
secretMeta = smSource.describe_secret(SecretId = secretArn)
if secretMeta['KmsKeyId'] != None:
replicaKmsKeyArn = environ.get('ReplicaKmsKeyArn')
if replicaKmsKeyArn == None:
raise Exception('Cannot create replica of a secret that uses a custom KMS key unless the "ReplicaKmsKeyArn" environment variable is set. Alternatively, you can also create the key manually in the replica region with the same name')
smTarget.create_secret(
Name = secretName,
ClientRequestToken = currentVersion,
KmsKeyId = replicaKmsKeyArn,
SecretString = newSecret['SecretString'],
Description = secretMeta['Description']
)
else:
smTarget.create_secret(
Name = secretName,
ClientRequestToken = currentVersion,
SecretString = newSecret['SecretString'],
Description = secretMeta['Description']
)
else:
secretMeta = smTarget.describe_secret(SecretId = secretName)
for previousVersion, labelList in secretMeta['VersionIdsToStages'].items():
if 'AWSCURRENT' in labelList and previousVersion != currentVersion:
print('Moving "AWSCURRENT" label from version "{0}" to new version "{1}"'.format(previousVersion, currentVersion))
smTarget.update_secret_version_stage(
SecretId = secretName,
VersionStage = 'AWSCURRENT',
MoveToVersionId = currentVersion,
RemoveFromVersionId = previousVersion
)
break
print('Secret {0} replicated successfully to region "{1}"'.format(secretName, targetRegion))
Now that your CloudFormation Stack is completed, and all necessary resources set up, you are ready to begin secret replication. To verify the setup works, you can modify the secret in the origin region that houses your RDS database credentials. It will take a few moments for the secret label to return to AWSCURRENT, and then roughly another 5-15 minutes for the CloudTrail Logs to populate and then send the Events to CloudWatch Logs. Once received, the Event will trigger the Lambda function to complete the replication process. You will be able to verify the secret has correctly replicated by going to the Secrets Manager Console in your replication Region, selecting the replicated secret, and viewing the values. If the values are the same in the replicated secret as they are the origin secret, and both labels are AWSCURRENT, you know replication completed successfully and your credentials will be ready if needed.
Summary
In this post, you learned how you can use AWS Lambda and Amazon CloudWatch Events to automate replication of your secrets in AWS Secrets Manager across AWS Regions. You used a CloudFormation template to create the necessary resources for the replication setup. The CloudWatch Event will watch for any CloudTrail Logs which would trigger the Lambda function that then pulls the secret name and value and replicates it to the AWS Region of your choice. Should a disaster occur, you’ll have increased your chances for a smooth recovery of your databases, and you’ll be back in production quicker.
If you have feedback about this blog post, submit comments in the Comments section below. If you have questions about this blog post, start a new thread on the AWS Secrets Manager forum.
Want more AWS Security news? Follow us on Twitter.
In the era of the cloud, hosting a static website is cheaper, faster and simpler than traditional on premise hosting, where you always have to maintain a running server. Basically, no static website is truly static. I can promise you will find at least a “contact us” page in most static websites, which, by their very nature, are dynamically generated. And all businesses need a “contact us” page to help customers connect with business owners for services, inquiries or feedback. In its simplest form, a “contact us” page should collect a user’s basic information (name, e-mail id, phone number, short message and e-mail) and get shared with the business via email when submitted.
AWS provides a simplified way to host your static website in an Amazon S3 bucket using your own custom domain. You can either choose to register a new domain with AWS Route 53 or transfer your domain to Route 53 for hosting in five simple steps.
Obviously, you don’t want to spin-up a server to handle a simple “contact us” form, but it’s a critical element of your website. Luckily, in this post-cloud world, AWS delivers a serverless option. You can use AWS Lambda with Amazon API Gateway to create a serverless backend and use Amazon Simple Email Service to send an e-mail to the business owner whenever a customer submits any inquiry or feedback. Let’s learn how to do it.
Architecture Flow
Here, we are assuming a common website-to-cloud migration scenario, where you have registered your domain name with a 3rd party domain registrar and after migration of your website to Amazon S3. From there, you switched to Amazon Route 53 as your DNS provider. You contacted your DNS provider and updated the name server (NS) record to use the name servers in the delegation that you set in Amazon Route 53 (find step-by-step details in the AWS S3 development guide). Your email server still belongs to your DNS provider as you brought that in the package when you registered your domain with a multi-year contract.
Following is the architecture flow with detailed guidance.
lambdases
In the above diagram, the customer is submitting their inquiry through a “contact us” form, which is hosted in an Amazon S3 bucket as a static website. Information will flow in three simple steps:
Your “contact us” form will collect all user information and post to Amazon API Gateway restful service.
Amazon API Gateway will pass collected user information to an AWS lambda function.
AWS Lambda function will auto generate an e-mail and forward it to your mail server using Amazon SES.
Your “Contact Us” Form
Let’s start with a simple “contact us” form html code snippet:
The above form will ask the user to enter their name, phone, e-mail, and provide a free-form text box to write inquiry/feedback details and includes a submit button.
Later in the post, I’ll share the JQuery code for field validation and the variables to collect values.
Defining AWS Lambda Function
The next step is to create a lambda function, which will get all user information through the API Gateway. The lambda function will look something like this:
The AWS lambda function mailfwd is triggered from the API Gateway POST method, which we will create the next section and send information to Amazon SES for mail forwarding.
Go to the console and click on “Create Function” and select blueprints for hello-world nodejs6.10 version as shown in below screenshot and click on configure button at the bottom.
To create your AWS Lambda function, select the “edit code inline” setting, which will have an editor box with the code in it, and replace that code (making sure to change [email protected] to your real e-mail address and update your actual domain in the response variable):
Now you can execute and test your AWS lambda function as directed in the AWS developer guide. Make sure to update the Lambda execution role and follow the steps provided in the Lambda developer guide to create a basic execution role.
Add following code under policy to allow Amazon SES access to AWS lambda function:
Now, let’s create the API Gateway that will provide a restful API endpoint for our AWS Lambda function, which we are going to create next. We will use this API endpoint to post user-submitted information in the “Contact Us” form — which will also get posted to the AWS Lambda function.
Login to AWS console and select API Gateway. Click on create new API and fill your API name.
Now go to your API name — listed in the left-hand navigation — click on the “actions” drop down, and select “create resource.”
Select your newly-created resource and choose “create method.” Choose a POST. Here, you will choose our AWS Lambda Function. To do this, select “mailfwd” from the drop down.
After saving the form above, Click on the “action” menu and choose “deploy API.” You will see final resources and methods something like below:
Now get your Restful API URL from the “stages” tab as shown in the screenshot below. We will use this URL on our “contact us” HTML page to send the request with all user information.
Make sure to Enable CORS in the API Gateway or you’ll get an error:”Cross-Origin Request Blocked: The Same Origin Policy disallows reading the remote resource at https://abc1234.execute-api.us-east-1.amazonaws.com/02/mailme. (Reason: CORS header ‘Access-Control-Allow-Origin’ missing).”
Setup Amazon SES
Amazon SES requires that you verify your identities (the domains or email addresses that you send email from) to confirm that you own them, and to prevent unauthorized use. Follow the steps outlined in the Amazon SES user guide to verify your sender e-mail.
Connecting it all Together
Since we created our AWS Lambda function and provided the API-endpoint access using API gateway, it’s time to connect all the pieces together and test them. Put following JQuery code in your ContactUs HTML page <head> section. Replace URL variable with your API Gateway URL. You can change field validation as per your need.
function submitToAPI(e) {
e.preventDefault();
var URL = "https://abc1234.execute-api.us-east-1.amazonaws.com/01/contact";
var Namere = /[A-Za-z]{1}[A-Za-z]/;
if (!Namere.test($("#name-input").val())) {
alert ("Name can not less than 2 char");
return;
}
var mobilere = /[0-9]{10}/;
if (!mobilere.test($("#phone-input").val())) {
alert ("Please enter valid mobile number");
return;
}
if ($("#email-input").val()=="") {
alert ("Please enter your email id");
return;
}
var reeamil = /^([\w-\.][email protected]([\w-]+\.)+[\w-]{2,6})?$/;
if (!reeamil.test($("#email-input").val())) {
alert ("Please enter valid email address");
return;
}
var name = $("#name-input").val();
var phone = $("#phone-input").val();
var email = $("#email-input").val();
var desc = $("#description-input").val();
var data = {
name : name,
phone : phone,
email : email,
desc : desc
};
$.ajax({
type: "POST",
url : "https://abc1234.execute-api.us-east-1.amazonaws.com/01/contact",
dataType: "json",
crossDomain: "true",
contentType: "application/json; charset=utf-8",
data: JSON.stringify(data),
success: function () {
// clear form and show a success message
alert("Successfull");
document.getElementById("contact-form").reset();
location.reload();
},
error: function () {
// show an error message
alert("UnSuccessfull");
}});
}
Now you should be able to submit your contact form and start receiving email notifications when a form is completed and submitted.
Conclusion
Here we are addressing a common use case — a simple contact form — which is important for any small business hosting their website on Amazon S3. This post should help make your static website more dynamic without spinning up any server.
Have you had challenges adding a “contact us” form to your small business website?
About the author
Saurabh Shrivastava is a Solutions Architect working with global systems integrators. He works with our partners and customers to provide them architectural guidance for building scalable architecture in hybrid and AWS environment. In his spare time, he enjoys spending time with his family, hiking, and biking.
Last year, we released Amazon Connect, a cloud-based contact center service that enables any business to deliver better customer service at low cost. This service is built based on the same technology that empowers Amazon customer service associates. Using this system, associates have millions of conversations with customers when they inquire about their shipping or order information. Because we made it available as an AWS service, you can now enable your contact center agents to make or receive calls in a matter of minutes. You can do this without having to provision any kind of hardware. 2
There are several advantages of building your contact center in the AWS Cloud, as described in our documentation. In addition, customers can extend Amazon Connect capabilities by using AWS products and the breadth of AWS services. In this blog post, we focus on how to get analytics out of the rich set of data published by Amazon Connect. We make use of an Amazon Connect data stream and create an end-to-end workflow to offer an analytical solution that can be customized based on need.
Solution overview
The following diagram illustrates the solution.
In this solution, Amazon Connect exports its contact trace records (CTRs) using Amazon Kinesis. CTRs are data streams in JSON format, and each has information about individual contacts. For example, this information might include the start and end time of a call, which agent handled the call, which queue the user chose, queue wait times, number of holds, and so on. You can enable this feature by reviewing our documentation.
In this architecture, we use Kinesis Firehose to capture Amazon Connect CTRs as raw data in an Amazon S3 bucket. We don’t use the recent feature added by Kinesis Firehose to save the data in S3 as Apache Parquet format. We use AWS Glue functionality to automatically detect the schema on the fly from an Amazon Connect data stream.
The primary reason for this approach is that it allows us to use attributes and enables an Amazon Connect administrator to dynamically add more fields as needed. Also by converting data to parquet in batch (every couple of hours) compression can be higher. However, if your requirement is to ingest the data in Parquet format on realtime, we recoment using Kinesis Firehose recently launched feature. You can review this blog post for further information.
By default, Firehose puts these records in time-series format. To make it easy for AWS Glue crawlers to capture information from new records, we use AWS Lambda to move all new records to a single S3 prefix called flatfiles. Our Lambda function is configured using S3 event notification. To comply with AWS Glue and Athena best practices, the Lambda function also converts all column names to lowercase. Finally, we also use the Lambda function to start AWS Glue crawlers. AWS Glue crawlers identify the data schema and update the AWS Glue Data Catalog, which is used by extract, transform, load (ETL) jobs in AWS Glue in the latter half of the workflow.
You can see our approach in the Lambda code following.
from __future__ import print_function
import json
import urllib
import boto3
import os
import re
s3 = boto3.resource('s3')
client = boto3.client('s3')
def convertColumntoLowwerCaps(obj):
for key in obj.keys():
new_key = re.sub(r'[\W]+', '', key.lower())
v = obj[key]
if isinstance(v, dict):
if len(v) > 0:
convertColumntoLowwerCaps(v)
if new_key != key:
obj[new_key] = obj[key]
del obj[key]
return obj
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
try:
client.download_file(bucket, key, '/tmp/file.json')
with open('/tmp/out.json', 'w') as output, open('/tmp/file.json', 'rb') as file:
i = 0
for line in file:
for object in line.replace("}{","}\n{").split("\n"):
record = json.loads(object,object_hook=convertColumntoLowwerCaps)
if i != 0:
output.write("\n")
output.write(json.dumps(record))
i += 1
newkey = 'flatfiles/' + key.replace("/", "")
client.upload_file('/tmp/out.json', bucket,newkey)
s3.Object(bucket,key).delete()
return "success"
except Exception as e:
print(e)
print('Error coping object {} from bucket {}'.format(key, bucket))
raise e
We trigger AWS Glue crawlers based on events because this approach lets us capture any new data frame that we want to be dynamic in nature. CTR attributes are designed to offer multiple custom options based on a particular call flow. Attributes are essentially key-value pairs in nested JSON format. With the help of event-based AWS Glue crawlers, you can easily identify newer attributes automatically.
We recommend setting up an S3 lifecycle policy on the flatfiles folder that keeps records only for 24 hours. Doing this optimizes AWS Glue ETL jobs to process a subset of files rather than the entire set of records.
After we have data in the flatfiles folder, we use AWS Glue to catalog the data and transform it into Parquet format inside a folder called parquet/ctr/. The AWS Glue job performs the ETL that transforms the data from JSON to Parquet format. We use AWS Glue crawlers to capture any new data frame inside the JSON code that we want to be dynamic in nature. What this means is that when you add new attributes to an Amazon Connect instance, the solution automatically recognizes them and incorporates them in the schema of the results.
After AWS Glue stores the results in Parquet format, you can perform analytics using Amazon Redshift Spectrum, Amazon Athena, or any third-party data warehouse platform. To keep this solution simple, we have used Amazon Athena for analytics. Amazon Athena allows us to query data without having to set up and manage any servers or data warehouse platforms. Additionally, we only pay for the queries that are executed.
Try it out!
You can get started with our sample AWS CloudFormation template. This template creates the components starting from the Kinesis stream and finishes up with S3 buckets, the AWS Glue job, and crawlers. To deploy the template, open the AWS Management Console by clicking the following link.
In the console, specify the following parameters:
BucketName: The name for the bucket to store all the solution files. This name must be unique; if it’s not, template creation fails.
etlJobSchedule: The schedule in cron format indicating how often the AWS Glue job runs. The default value is every hour.
KinesisStreamName: The name of the Kinesis stream to receive data from Amazon Connect. This name must be different from any other Kinesis stream created in your AWS account.
s3interval: The interval in seconds for Kinesis Firehose to save data inside the flatfiles folder on S3. The value must between 60 and 900 seconds.
sampledata: When this parameter is set to true, sample CTR records are used. Doing this lets you try this solution without setting up an Amazon Connect instance. All examples in this walkthrough use this sample data.
Select the “I acknowledge that AWS CloudFormation might create IAM resources.” check box, and then choose Create. After the template finishes creating resources, you can see the stream name on the stack Outputs tab.
If you haven’t created your Amazon Connect instance, you can do so by following the Getting Started Guide. When you are done creating, choose your Amazon Connect instance in the console, which takes you to instance settings. Choose Data streaming to enable streaming for CTR records. Here, you can choose the Kinesis stream (defined in the KinesisStreamName parameter) that was created by the CloudFormation template.
Now it’s time to generate the data by making or receiving calls by using Amazon Connect. You can go to Amazon Connect Cloud Control Panel (CCP) to make or receive calls using a software phone or desktop phone. After a few minutes, we should see data inside the flatfiles folder. To make it easier to try this solution, we provide sample data that you can enable by setting the sampledata parameter to true in your CloudFormation template.
You can navigate to the AWS Glue console by choosing Jobs on the left navigation pane of the console. We can select our job here. In my case, the job created by CloudFormation is called glueJob-i3TULzVtP1W0; yours should be similar. You run the job by choosing Run job for Action.
After that, we wait for the AWS Glue job to run and to finish successfully. We can track the status of the job by checking the History tab.
When the job finishes running, we can check the Database section. There should be a new table created called ctr in Parquet format.
To query the data with Athena, we can select the ctr table, and for Action choose View data.
Doing this takes us to the Athena console. If you run a query, Athena shows a preview of the data.
When we can query the data using Athena, we can visualize it using Amazon QuickSight. Before connecting Amazon QuickSight to Athena, we must make sure to grant Amazon QuickSight access to Athena and the associated S3 buckets in the account. For more information on doing this, see Managing Amazon QuickSight Permissions to AWS Resources in the Amazon QuickSight User Guide. We can then create a new data set in Amazon QuickSight based on the Athena table that was created.
After setting up permissions, we can create a new analysis in Amazon QuickSight by choosing New analysis.
Then we add a new data set.
We choose Athena as the source and give the data source a name (in this case, I named it connectctr).
Choose the name of the database and the table referencing the Parquet results.
Then choose Visualize.
After that, we should see the following screen.
Now we can create some visualizations. First, search for the agent.username column, and drag it to the AutoGraph section.
We can see the agents and the number of calls for each, so we can easily see which agents have taken the largest amount of calls. If we want to see from what queues the calls came for each agent, we can add the queue.arn column to the visual.
After following all these steps, you can use Amazon QuickSight to add different columns from the call records and perform different types of visualizations. You can build dashboards that continuously monitor your connect instance. You can share those dashboards with others in your organization who might need to see this data.
Conclusion
In this post, you see how you can use services like AWS Lambda, AWS Glue, and Amazon Athena to process Amazon Connect call records. The post also demonstrates how to use AWS Lambda to preprocess files in Amazon S3 and transform them into a format that recognized by AWS Glue crawlers. Finally, the post shows how to used Amazon QuickSight to perform visualizations.
You can use the provided template to analyze your own contact center instance. Or you can take the CloudFormation template and modify it to process other data streams that can be ingested using Amazon Kinesis or stored on Amazon S3.
Luis Caro is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.
Peter Dalbhanjan is a Solutions Architect for AWS based in Herndon, VA. Peter has a keen interest in evangelizing AWS solutions and has written multiple blog posts that focus on simplifying complex use cases. At AWS, Peter helps with designing and architecting variety of customer workloads.
Previously, I showed you how to rotate Amazon RDS database credentials automatically with AWS Secrets Manager. In addition to database credentials, AWS Secrets Manager makes it easier to rotate, manage, and retrieve API keys, OAuth tokens, and other secrets throughout their lifecycle. You can configure Secrets Manager to rotate these secrets automatically, which can help you meet your compliance needs. You can also use Secrets Manager to rotate secrets on demand, which can help you respond quickly to security events. In this post, I show you how to store an API key in Secrets Manager and use a custom Lambda function to rotate the key automatically. I’ll use a Twitter API key and bearer token as an example; you can reference this example to rotate other types of API keys.
The instructions are divided into four main phases:
Store a Twitter API key and bearer token in Secrets Manager.
Create a custom Lambda function to rotate the bearer token.
Configure your application to retrieve the bearer token from Secrets Manager.
Configure Secrets Manager to use the custom Lambda function to rotate the bearer token automatically.
For the purpose of this post, I use the placeholder Demo/Twitter_Api_Key to denote the API key, the placeholder Demo/Twitter_bearer_token to denote the bearer token, and placeholder Lambda_Rotate_Bearer_Token to denote the custom Lambda function. Be sure to replace these placeholders with the resource names from your account.
Phase 1: Store a Twitter API key and bearer token in Secrets Manager
Twitter enables developers to register their applications and retrieve an API key, which includes a consumer_key and consumer_secret. Developers use these to generate a bearer token that applications can then use to authenticate and retrieve information from Twitter. At any given point of time, you can use an API key to create only one valid bearer token.
Start by storing the API key in Secrets Manager. Here’s how:
Figure 1: The “Store a new secret” button in the AWS Secrets Manager console
Select Other type of secrets (because you’re storing an API key).
Input the consumer_key and consumer_secret, and then select Next.
Figure 2: Select the consumer_key and the consumer_secret
Specify values for Secret Name and Description, then select Next. For this example, I use Demo/Twitter_API_Key.
Figure 3: Set values for “Secret Name” and “Description”
On the next screen, keep the default setting, Disable automatic rotation, because you’ll use the same API key to rotate bearer tokens programmatically and automatically. Applications and employees will not retrieve this API key. Select Next.
Figure 4: Keep the default “Disable automatic rotation” setting
Review the information on the next screen and, if everything looks correct, select Store. You’ve now successfully stored a Twitter API key in Secrets Manager.
Next, store the bearer token in Secrets Manager. Here’s how:
From the Secrets Manager console, select Store a new secret, select Other type of secrets, input details (access_token, token_type, and ARN of the API key) about the bearer token, and then select Next.
Figure 5: Add details about the bearer token
Specify values for Secret Name and Description, and then select Next. For this example, I use Demo/Twitter_bearer_token.
Figure 6: Again set values for “Secret Name” and “Description”
Keep the default rotation setting, Disable automatic rotation, and then select Next. You’ll enable rotation after you’ve updated the application to use Secrets Manager APIs to retrieve secrets.
Review the information and select Store. You’ve now completed storing the bearer token in Secrets Manager. I take note of the sample code provided on the review page. I’ll use this code to update my application to retrieve the bearer token using Secrets Manager APIs.
Figure 7: The sample code you can use in your app
Phase 2: Create a custom Lambda function to rotate the bearer token
While Secrets Manager supports rotating credentials for databases hosted on Amazon RDS natively, it also enables you to meet your unique rotation-related use cases by authoring custom Lambda functions. Now that you’ve stored the API key and bearer token, you’ll create a Lambda function to rotate the bearer token. For this example, I’ll create my Lambda function using Python 3.6.
Figure 8: In the Lambda console, select “Create function”
Select Author from scratch. For this example, I use the name Lambda_Rotate_Bearer_Token for my Lambda function. I also set the Runtime environment as Python 3.6.
Figure 9: Create a new function from scratch
This Lambda function requires permissions to call AWS resources on your behalf. To grant these permissions, select Create a custom role. This opens a console tab.
Select Create a new IAM Role and specify the value for Role Name. For this example, I use Role_Lambda_Rotate_Twitter_Bearer_Token.
Figure 10: For “IAM Role,” select “Create a new IAM role”
Next, to define the IAM permissions, copy and paste the following IAM policy in the View Policy Document text-entry field. Be sure to replace the placeholder ARN-OF-Demo/Twitter_API_Key with the ARN of your secret.
Figure 11: The IAM policy pasted in the “View Policy Document” text-entry field
Now, select Allow. This brings me back to the Lambda console with the appropriate Role selected.
Select Create function.
Figure 12: Select the “Create function” button in the lower-right corner
Copy the following Python code and paste it in the Function code section.
import base64
import json
import logging
import os
import boto3
from botocore.vendored import requests
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""Secrets Manager Twitter Bearer Token Handler
This handler uses the master-user rotation scheme to rotate a bearer token of a Twitter app.
The Secret PlaintextString is expected to be a JSON string with the following format:
{
'access_token': ,
'token_type': ,
'masterarn':
}
Args:
event (dict): Lambda dictionary of event parameters. These keys must include the following:
- SecretId: The secret ARN or identifier
- ClientRequestToken: The ClientRequestToken of the secret version
- Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret)
context (LambdaContext): The Lambda runtime information
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not properly configured for rotation
KeyError: If the secret json does not contain the expected keys
"""
arn = event['SecretId']
token = event['ClientRequestToken']
step = event['Step']
# Setup the client and environment variables
service_client = boto3.client('secretsmanager', endpoint_url=os.environ['SECRETS_MANAGER_ENDPOINT'])
oauth2_token_url = os.environ['TWITTER_OAUTH2_TOKEN_URL']
oauth2_invalid_token_url = os.environ['TWITTER_OAUTH2_INVALID_TOKEN_URL']
tweet_search_url = os.environ['TWITTER_SEARCH_URL']
# Make sure the version is staged correctly
metadata = service_client.describe_secret(SecretId=arn)
if not metadata['RotationEnabled']:
logger.error("Secret %s is not enabled for rotation" % arn)
raise ValueError("Secret %s is not enabled for rotation" % arn)
versions = metadata['VersionIdsToStages']
if token not in versions:
logger.error("Secret version %s has no stage for rotation of secret %s." % (token, arn))
raise ValueError("Secret version %s has no stage for rotation of secret %s." % (token, arn))
if "AWSCURRENT" in versions[token]:
logger.info("Secret version %s already set as AWSCURRENT for secret %s." % (token, arn))
return
elif "AWSPENDING" not in versions[token]:
logger.error("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
# Call the appropriate step
if step == "createSecret":
create_secret(service_client, arn, token, oauth2_token_url, oauth2_invalid_token_url)
elif step == "setSecret":
set_secret(service_client, arn, token, oauth2_token_url)
elif step == "testSecret":
test_secret(service_client, arn, token, tweet_search_url)
elif step == "finishSecret":
finish_secret(service_client, arn, token)
else:
logger.error("lambda_handler: Invalid step parameter %s for secret %s" % (step, arn))
raise ValueError("Invalid step parameter %s for secret %s" % (step, arn))
def create_secret(service_client, arn, token, oauth2_token_url, oauth2_invalid_token_url):
"""Get a new bearer token from Twitter
This method invalidates existing bearer token for the Twitter app and retrieves a new one from Twitter.
If a secret version with AWSPENDING stage exists, updates it with the newly retrieved bearer token and if
the AWSPENDING stage does not exist, creates a new version of the secret with that stage label.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
oauth2_token_url (string): The Twitter API endpoint to request a bearer token
oauth2_invalid_token_url (string): The Twitter API endpoint to invalidate a bearer token
Raises:
ValueError: If the current secret is not valid JSON
KeyError: If the secret json does not contain the expected keys
ResourceNotFoundException: If the current secret is not found
"""
# Make sure the current secret exists and try to get the master arn from the secret
try:
current_secret_dict = get_secret_dict(service_client, arn, "AWSCURRENT")
master_arn = current_secret_dict['masterarn']
logger.info("createSecret: Successfully retrieved secret for %s." % arn)
except service_client.exceptions.ResourceNotFoundException:
return
# create bearer token credentials to be passed as authorization string to Twitter
bearer_token_credentials = encode_credentials(service_client, master_arn, "AWSCURRENT")
# get the bearer token from Twitter
bearer_token_from_twitter = get_bearer_token(bearer_token_credentials,oauth2_token_url)
# invalidate the current bearer token
invalidate_bearer_token(oauth2_invalid_token_url,bearer_token_credentials,bearer_token_from_twitter)
# get a new bearer token from Twitter
new_bearer_token = get_bearer_token(bearer_token_credentials, oauth2_token_url)
# if a secret version with AWSPENDING stage exists, update it with the lastest bearer token
# if the AWSPENDING stage does not exist, then create the version with AWSPENDING stage
try:
pending_secret_dict = get_secret_dict(service_client, arn, "AWSPENDING", token)
pending_secret_dict['access_token'] = new_bearer_token
service_client.put_secret_value(SecretId=arn, ClientRequestToken=token, SecretString=json.dumps(pending_secret_dict), VersionStages=['AWSPENDING'])
logger.info("createSecret: Successfully invalidated the bearer token of the secret %s and updated the pending version" % arn)
except service_client.exceptions.ResourceNotFoundException:
current_secret_dict['access_token'] = new_bearer_token
service_client.put_secret_value(SecretId=arn, ClientRequestToken=token, SecretString=json.dumps(current_secret_dict), VersionStages=['AWSPENDING'])
logger.info("createSecret: Successfully invalidated the bearer token of the secret %s and and created the pending version." % arn)
def set_secret(service_client, arn, token, oauth2_token_url):
"""Validate the pending secret with that in Twitter
This method checks wether the bearer token in Twitter is the same as the one in the version with AWSPENDING stage.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
oauth2_token_url (string): The Twitter API endopoint to get a bearer token
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not valid JSON or master credentials could not be used to login to DB
KeyError: If the secret json does not contain the expected keys
"""
# First get the pending version of the bearer token and compare it with that in Twitter
pending_secret_dict = get_secret_dict(service_client, arn, "AWSPENDING")
master_arn = pending_secret_dict['masterarn']
# create bearer token credentials to be passed as authorization string to Twitter
bearer_token_credentials = encode_credentials(service_client, master_arn, "AWSCURRENT")
# get the bearer token from Twitter
bearer_token_from_twitter = get_bearer_token(bearer_token_credentials, oauth2_token_url)
# if the bearer tokens are same, invalidate the bearer token in Twitter
# if not, raise an exception that bearer token in Twitter was changed outside Secrets Manager
if pending_secret_dict['access_token'] == bearer_token_from_twitter:
logger.info("createSecret: Successfully verified the bearer token of arn %s" % arn)
else:
raise ValueError("The bearer token of the Twitter app was changed outside Secrets Manager. Please check.")
def test_secret(service_client, arn, token, tweet_search_url):
"""Test the pending secret by calling a Twitter API
This method tries to use the bearer token in the secret version with AWSPENDING stage and search for tweets
with 'aws secrets manager' string.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not valid JSON or pending credentials could not be used to login to the database
KeyError: If the secret json does not contain the expected keys
"""
# First get the pending version of the bearer token and compare it with that in Twitter
pending_secret_dict = get_secret_dict(service_client, arn, "AWSPENDING", token)
# Now verify you can search for tweets using the bearer token
if verify_bearer_token(pending_secret_dict['access_token'], tweet_search_url):
logger.info("testSecret: Successfully authorized with the pending secret in %s." % arn)
return
else:
logger.error("testSecret: Unable to authorize with the pending secret of secret ARN %s" % arn)
raise ValueError("Unable to connect to Twitter with pending secret of secret ARN %s" % arn)
def finish_secret(service_client, arn, token):
"""Finish the rotation by marking the pending secret as current
This method moves the secret from the AWSPENDING stage to the AWSCURRENT stage.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
"""
# First describe the secret to get the current version
metadata = service_client.describe_secret(SecretId=arn)
current_version = None
for version in metadata["VersionIdsToStages"]:
if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
if version == token:
# The correct version is already marked as current, return
logger.info("finishSecret: Version %s already marked as AWSCURRENT for %s" % (version, arn))
return
current_version = version
break
# Finalize by staging the secret version current
service_client.update_secret_version_stage(SecretId=arn, VersionStage="AWSCURRENT", MoveToVersionId=token, RemoveFromVersionId=current_version)
logger.info("finishSecret: Successfully set AWSCURRENT stage to version %s for secret %s." % (version, arn))
def encode_credentials(service_client, arn, stage):
"""Encodes the Twitter credentials
This helper function encodes the Twitter credentials (consumer_key and consumer_secret)
Args:
service_client (client):The secrets manager service client
arn (string): The secret ARN or other identifier
stage (stage): The stage identifying the secret version
Returns:
encoded_credentials (string): base64 encoded authorization string for Twitter
Raises:
KeyError: If the secret json does not contain the expected keys
"""
required_fields = ['consumer_key','consumer_secret']
master_secret_dict = get_secret_dict(service_client, arn, stage)
for field in required_fields:
if field not in master_secret_dict:
raise KeyError("%s key is missing from the secret JSON" % field)
encoded_credentials = base64.urlsafe_b64encode(
'{}:{}'.format(master_secret_dict['consumer_key'], master_secret_dict['consumer_secret']).encode('ascii')).decode('ascii')
return encoded_credentials
def get_bearer_token(encoded_credentials, oauth2_token_url):
"""Gets a bearer token from Twitter
This helper function retrieves the current bearer token from Twitter, given a set of credentials.
Args:
encoded_credentials (string): Twitter credentials for authentication
oauth2_token_url (string): REST API endpoint to request a bearer token from Twitter
Raises:
KeyError: If the secret json does not contain the expected keys
"""
headers = {
'Authorization': 'Basic {}'.format(encoded_credentials),
'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8',
}
data = 'grant_type=client_credentials'
response = requests.post(oauth2_token_url, headers=headers, data=data)
response_data = response.json()
if response_data['token_type'] == 'bearer':
bearer_token = response_data['access_token']
return bearer_token
else:
raise RuntimeError('unexpected token type: {}'.format(response_data['token_type']))
def invalidate_bearer_token(oauth2_invalid_token_url, bearer_token_credentials, bearer_token):
"""Invalidates a Bearer Token of a Twitter App
This helper function invalidates a bearer token of a Twitter app.
If successful, it returns the invalidated bearer token, else None
Args:
oauth2_invalid_token_url (string): The Twitter API endpoint to invalidate a bearer token
bearer_token_credentials (string): encoded consumer key and consumer secret to authenticate with Twitter
bearer_token (string): The bearer token to be invalidated
Returns:
invalidated_bearer_token: The invalidated bearer token
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not valid JSON
KeyError: If the secret json does not contain the expected keys
"""
headers = {
'Authorization': 'Basic {}'.format(bearer_token_credentials),
'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8',
}
data = 'access_token=' + bearer_token
invalidate_response = requests.post(oauth2_invalid_token_url, headers=headers, data=data)
invalidate_response_data = invalidate_response.json()
if invalidate_response_data:
return
else:
raise RuntimeError('Invalidate bearer token request failed')
def verify_bearer_token(bearer_token, tweet_search_url):
"""Verifies access to Twitter APIs using a bearer token
This helper function verifies that the bearer token is valid by calling Twitter's search/tweets API endpoint
Args:
bearer_token (string): The current bearer token for the application
Returns:
True or False
Raises:
KeyError: If the response of search tweets API call fails
"""
headers = {
'Authorization' : 'Bearer {}'.format(bearer_token),
'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8',
}
search_results = requests.get(tweet_search_url, headers=headers)
try:
search_results.json()['statuses']
return True
except:
return False
def get_secret_dict(service_client, arn, stage, token=None):
"""Gets the secret dictionary corresponding for the secret arn, stage, and token
This helper function gets credentials for the arn and stage passed in and returns the dictionary by parsing the JSON string
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version, or None if no validation is desired
stage (string): The stage identifying the secret version
Returns:
SecretDictionary: Secret dictionary
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not valid JSON
"""
# Only do VersionId validation against the stage if a token is passed in
if token:
secret = service_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage=stage)
else:
secret = service_client.get_secret_value(SecretId=arn, VersionStage=stage)
plaintext = secret['SecretString']
# Parse and return the secret JSON string
return json.loads(plaintext)
Here’s what it will look like:
Figure 13: The Python code pasted in the “Function code” section
On the same page, provide the following environment variables:
Note: Resources used in this example are in US East (Ohio) region. If you intend to use another AWS Region, change the SECRETS_MANAGER_ENDPOINT set in the Environment variables to the appropriate region.
You’ve now created a Lambda function that can rotate the bearer token:
Figure 15: The new Lambda function
Before you can configure Secrets Manager to use this Lambda function, you need to update the function policy of the Lambda function. A function policy permits AWS services, such as Secrets Manager, to invoke a Lambda function on behalf of your application. You can attach a Lambda function policy from the AWS Command Line Interface (AWS CLI) or SDK. To attach a function policy, call the add-permission Lambda API from the AWS CLI.
Phase 3: Configure your application to retrieve the bearer token from Secrets Manager
Now that you’ve stored the bearer token in Secrets Manager, update the application to retrieve the bearer token from Secrets Manager instead of hard-coding this information in a configuration file or source code. For this example, I show you how to configure a Python application to retrieve this secret from Secrets Manager.
import config
def no_secrets_manager_sample()
# Get the bearer token from a config file.
Bearer_token = config.bearer_token
# Use the bearer token to authenticate requests to Twitter
Use the sample code from section titled Phase 1 and update the application to retrieve the bearer token from Secrets Manager. The following code sets up the client and retrieves and decrypts the secret Demo/Twitter_bearer_token.
# Use this code snippet in your app.
import boto3
from botocore.exceptions import ClientError
def get_secret():
secret_name = "Demo/Twitter_bearer_token"
endpoint_url = "https://secretsmanager.us-east-2.amazonaws.com"
region_name = "us-east-2"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name,
endpoint_url=endpoint_url
)
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
print("The requested secret " + secret_name + " was not found")
elif e.response['Error']['Code'] == 'InvalidRequestException':
print("The request was invalid due to:", e)
elif e.response['Error']['Code'] == 'InvalidParameterException':
print("The request had invalid params:", e)
else:
# Decrypted secret using the associated KMS CMK
# Depending on whether the secret was a string or binary, one of these fields will be populated
if 'SecretString' in get_secret_value_response:
secret = get_secret_value_response['SecretString']
else:
binary_secret_data = get_secret_value_response['SecretBinary']
# Your code goes here.
Applications require permissions to access Secrets Manager. My application runs on Amazon EC2 and uses an IAM role to get access to AWS services. I’ll attach the following policy to my IAM role, and you should take a similar action with your IAM role. This policy uses the GetSecretValue action to grant my application permissions to read secrets from Secrets Manager. This policy also uses the resource element to limit my application to read only the Demo/Twitter_bearer_token secret from Secrets Manager. Read the AWS Secrets Manager documentation to understand the minimum IAM permissions required to retrieve a secret.
{
"Version": "2012-10-17",
"Statement": {
"Sid": "RetrieveBearerToken",
"Effect": "Allow",
"Action": "secretsmanager:GetSecretValue",
"Resource": Input ARN of the secret Demo/Twitter_bearer_token here
}
}
Note: To improve the resiliency of your applications, associate your application with two API keys/bearer tokens. This is a higher availability option because you can continue to use one bearer token while Secrets Manager rotates the other token. Read the AWS documentation to learn how AWS Secrets Manager rotates your secrets.
Phase 4: Enable and verify rotation
Now that you’ve stored the secret in Secrets Manager and created a Lambda function to rotate this secret, configure Secrets Manager to rotate the secret Demo/Twitter_bearer_token.
From the Secrets Manager console, go to the list of secrets and choose the secret you created in the first step (in my example, this is named Demo/Twitter_bearer_token).
Scroll to Rotation configuration, and then select Edit rotation.
Figure 16: Select the “Edit rotation” button
To enable rotation, select Enable automatic rotation, and then choose how frequently you want Secrets Manager to rotate this secret. For this example, I set the rotation interval to 30 days. I also choose the rotation Lambda function, Lambda_Rotate_Bearer_Token, from the drop-down list.
Figure 17: “Edit rotation configuration” options
The banner on the next screen confirms that I have successfully configured rotation and the first rotation is in progress, which enables you to verify that rotation is functioning as expected. Secrets Manager will rotate this credential automatically every 30 days.
Figure 18: Confirmation notice
Summary
In this post, I showed you how to configure Secrets Manager to manage and rotate an API key and bearer token used by applications to authenticate and retrieve information from Twitter. You can use the steps described in this blog to manage and rotate other API keys, as well.
Secrets Manager helps you protect access to your applications, services, and IT resources without the upfront investment and on-going maintenance costs of operating your own secrets management infrastructure. To get started, open the Secrets Manager console. To learn more, read the Secrets Manager documentation.
If you have comments about this post, submit them in the Comments section below. If you have questions about anything in this post, start a new thread on the Secrets Manager forum or contact AWS Support.
Want more AWS Security news? Follow us on Twitter.
Amazon Neptune is now Generally Available in US East (N. Virginia), US East (Ohio), US West (Oregon), and EU (Ireland). Amazon Neptune is a fast, reliable, fully-managed graph database service that makes it easy to build and run applications that work with highly connected datasets. At the core of Neptune is a purpose-built, high-performance graph database engine optimized for storing billions of relationships and querying the graph with millisecond latencies. Neptune supports two popular graph models, Property Graph and RDF, through Apache TinkerPop Gremlin and SPARQL, allowing you to easily build queries that efficiently navigate highly connected datasets. Neptune can be used to power everything from recommendation engines and knowledge graphs to drug discovery and network security. Neptune is fully-managed with automatic minor version upgrades, backups, encryption, and fail-over. I wrote about Neptune in detail for AWS re:Invent last year and customers have been using the preview and providing great feedback that the team has used to prepare the service for GA.
Now that Amazon Neptune is generally available there are a few changes from the preview:
A large number of performance enhancements and updates
Launching a Neptune cluster is as easy as navigating to the AWS Management Console and clicking create cluster. Of course you can also launch with CloudFormation, the CLI, or the SDKs.
You can monitor your cluster health and the health of individual instances through Amazon CloudWatch and the console.
Additional Resources
We’ve created two repos with some additional tools and examples here. You can expect continuous development on these repos as we add additional tools and examples.
Amazon Neptune Tools Repo This repo has a useful tool for converting GraphML files into Neptune compatible CSVs for bulk loading from S3.
Amazon Neptune Samples Repo This repo has a really cool example of building a collaborative filtering recommendation engine for video game preferences.
Purpose Built Databases
There’s an industry trend where we’re moving more and more onto purpose-built databases. Developers and businesses want to access their data in the format that makes the most sense for their applications. As cloud resources make transforming large datasets easier with tools like AWS Glue, we have a lot more options than we used to for accessing our data. With tools like Amazon Redshift, Amazon Athena, Amazon Aurora, Amazon DynamoDB, and more we get to choose the best database for the job or even enable entirely new use-cases. Amazon Neptune is perfect for workloads where the data is highly connected across data rich edges.
I’m really excited about graph databases and I see a huge number of applications. Looking for ideas of cool things to build? I’d love to build a web crawler in AWS Lambda that uses Neptune as the backing store. You could further enrich it by running Amazon Comprehend or Amazon Rekognition on the text and images found and creating a search engine on top of Neptune.
As always, feel free to reach out in the comments or on twitter to provide any feedback!
This post is courtesy of Otavio Ferreira, Manager, Amazon SNS, AWS Messaging.
Amazon SNS message filtering provides a set of string and numeric matching operators that allow each subscription to receive only the messages of interest. Hence, SNS message filtering can simplify your pub/sub messaging architecture by offloading the message filtering logic from your subscriber systems, as well as the message routing logic from your publisher systems.
After you set the subscription attribute that defines a filter policy, the subscribing endpoint receives only the messages that carry attributes matching this filter policy. Other messages published to the topic are filtered out for this subscription. In this way, the native integration between SNS and Amazon CloudWatch provides visibility into the number of messages delivered, as well as the number of messages filtered out.
CloudWatch metrics are captured automatically for you. To get started with SNS message filtering, see Filtering Messages with Amazon SNS.
Message Filtering Metrics
The following six CloudWatch metrics are relevant to understanding your SNS message filtering activity:
NumberOfMessagesPublished – Inbound traffic to SNS. This metric tracks all the messages that have been published to the topic.
NumberOfNotificationsDelivered – Outbound traffic from SNS. This metric tracks all the messages that have been successfully delivered to endpoints subscribed to the topic. A delivery takes place either when the incoming message attributes match a subscription filter policy, or when the subscription has no filter policy at all, which results in a catch-all behavior.
NumberOfNotificationsFilteredOut – This metric tracks all the messages that were filtered out because they carried attributes that didn’t match the subscription filter policy.
NumberOfNotificationsFilteredOut-NoMessageAttributes – This metric tracks all the messages that were filtered out because they didn’t carry any attributes at all and, consequently, didn’t match the subscription filter policy.
NumberOfNotificationsFilteredOut-InvalidAttributes – This metric keeps track of messages that were filtered out because they carried invalid or malformed attributes and, thus, didn’t match the subscription filter policy.
NumberOfNotificationsFailed – This last metric tracks all the messages that failed to be delivered to subscribing endpoints, regardless of whether a filter policy had been set for the endpoint. This metric is emitted after the message delivery retry policy is exhausted, and SNS stops attempting to deliver the message. At that moment, the subscribing endpoint is likely no longer reachable. For example, the subscribing SQS queue or Lambda function has been deleted by its owner. You may want to closely monitor this metric to address message delivery issues quickly.
Message filtering graphs
Through the AWS Management Console, you can compose graphs to display your SNS message filtering activity. The graph shows the number of messages published, delivered, and filtered out within the timeframe you specify (1h, 3h, 12h, 1d, 3d, 1w, or custom).
To compose an SNS message filtering graph with CloudWatch:
Open the CloudWatch console.
Choose Metrics, SNS, All Metrics, and Topic Metrics.
Select all metrics to add to the graph, such as:
NumberOfMessagesPublished
NumberOfNotificationsDelivered
NumberOfNotificationsFilteredOut
Choose Graphed metrics.
In the Statistic column, switch from Average to Sum.
Title your graph with a descriptive name, such as “SNS Message Filtering”
After you have your graph set up, you may want to copy the graph link for bookmarking, emailing, or sharing with co-workers. You may also want to add your graph to a CloudWatch dashboard for easy access in the future. Both actions are available to you on the Actions menu, which is found above the graph.
Summary
SNS message filtering defines how SNS topics behave in terms of message delivery. By using CloudWatch metrics, you gain visibility into the number of messages published, delivered, and filtered out. This enables you to validate the operation of filter policies and more easily troubleshoot during development phases.
SNS message filtering can be implemented easily with existing AWS SDKs by applying message and subscription attributes across all SNS supported protocols (Amazon SQS, AWS Lambda, HTTP, SMS, email, and mobile push). CloudWatch metrics for SNS message filtering is available now, in all AWS Regions.
The adoption of Apache Spark has increased significantly over the past few years, and running Spark-based application pipelines is the new normal. Spark jobs that are in an ETL (extract, transform, and load) pipeline have different requirements—you must handle dependencies in the jobs, maintain order during executions, and run multiple jobs in parallel. In most of these cases, you can use workflow scheduler tools like Apache Oozie, Apache Airflow, and even Cron to fulfill these requirements.
Apache Oozie is a widely used workflow scheduler system for Hadoop-based jobs. However, its limited UI capabilities, lack of integration with other services, and heavy XML dependency might not be suitable for some users. On the other hand, Apache Airflow comes with a lot of neat features, along with powerful UI and monitoring capabilities and integration with several AWS and third-party services. However, with Airflow, you do need to provision and manage the Airflow server. The Cron utility is a powerful job scheduler. But it doesn’t give you much visibility into the job details, and creating a workflow using Cron jobs can be challenging.
What if you have a simple use case, in which you want to run a few Spark jobs in a specific order, but you don’t want to spend time orchestrating those jobs or maintaining a separate application? You can do that today in a serverless fashion using AWS Step Functions. You can create the entire workflow in AWS Step Functions and interact with Spark on Amazon EMR through Apache Livy.
In this post, I walk you through a list of steps to orchestrate a serverless Spark-based ETL pipeline using AWS Step Functions and Apache Livy.
Input data
For the source data for this post, I use the New York City Taxi and Limousine Commission (TLC) trip record data. For a description of the data, see this detailed dictionary of the taxi data. In this example, we’ll work mainly with the following three columns for the Spark jobs.
Column name
Column description
RateCodeID
Represents the rate code in effect at the end of the trip (for example, 1 for standard rate, 2 for JFK airport, 3 for Newark airport, and so on).
FareAmount
Represents the time-and-distance fare calculated by the meter.
TripDistance
Represents the elapsed trip distance in miles reported by the taxi meter.
The trip data is in comma-separated values (CSV) format with the first row as a header. To shorten the Spark execution time, I trimmed the large input data to only 20,000 rows. During the deployment phase, the input file tripdata.csv is stored in Amazon S3 in the <<your-bucket>>/emr-step-functions/input/ folder.
The following image shows a sample of the trip data:
Solution overview
The next few sections describe how Spark jobs are created for this solution, how you can interact with Spark using Apache Livy, and how you can use AWS Step Functions to create orchestrations for these Spark applications.
At a high level, the solution includes the following steps:
Trigger the AWS Step Function state machine by passing the input file path.
The first stage in the state machine triggers an AWS Lambda
The Lambda function interacts with Apache Spark running on Amazon EMR using Apache Livy, and submits a Spark job.
The state machine waits a few seconds before checking the Spark job status.
Based on the job status, the state machine moves to the success or failure state.
Subsequent Spark jobs are submitted using the same approach.
The state machine waits a few seconds for the job to finish.
The job finishes, and the state machine updates with its final status.
Let’s take a look at the Spark application that is used for this solution.
Spark jobs
For this example, I built a Spark jar named spark-taxi.jar. It has two different Spark applications:
MilesPerRateCode – The first job that runs on the Amazon EMR cluster. This job reads the trip data from an input source and computes the total trip distance for each rate code. The output of this job consists of two columns and is stored in Apache Parquet format in the output path.
The following are the expected output columns:
rate_code – Represents the rate code for the trip.
total_distance – Represents the total trip distance for that rate code (for example, sum(trip_distance)).
RateCodeStatus – The second job that runs on the EMR cluster, but only if the first job finishes successfully. This job depends on two different input sets:
csv – The same trip data that is used for the first Spark job.
miles-per-rate – The output of the first job.
This job first reads the tripdata.csv file and aggregates the fare_amount by the rate_code. After this point, you have two different datasets, both aggregated by rate_code. Finally, the job uses the rate_code field to join two datasets and output the entire rate code status in a single CSV file.
The output columns are as follows:
rate_code_id – Represents the rate code type.
total_distance – Derived from first Spark job and represents the total trip distance.
total_fare_amount – A new field that is generated during the second Spark application, representing the total fare amount by the rate code type.
Note that in this case, you don’t need to run two different Spark jobs to generate that output. The goal of setting up the jobs in this way is just to create a dependency between the two jobs and use them within AWS Step Functions.
Both Spark applications take one input argument called rootPath. It’s the S3 location where the Spark job is stored along with input and output data. Here is a sample of the final output:
The next section discusses how you can use Apache Livy to interact with Spark applications that are running on Amazon EMR.
Using Apache Livy to interact with Apache Spark
Apache Livy provides a REST interface to interact with Spark running on an EMR cluster. Livy is included in Amazon EMR release version 5.9.0 and later. In this post, I use Livy to submit Spark jobs and retrieve job status. When Amazon EMR is launched with Livy installed, the EMR master node becomes the endpoint for Livy, and it starts listening on port 8998 by default. Livy provides APIs to interact with Spark.
Let’s look at a couple of examples how you can interact with Spark running on Amazon EMR using Livy.
To list active running jobs, you can execute the following from the EMR master node:
curl localhost:8998/sessions
If you want to do the same from a remote instance, just change localhost to the EMR hostname, as in the following (port 8998 must be open to that remote instance through the security group):
Through Spark submit, you can pass multiple arguments for the Spark job and Spark configuration settings. You can also do that using Livy, by passing the S3 path through the args parameter, as shown following:
For a detailed list of Livy APIs, see the Apache Livy REST API page. This post uses GET /batches and POST /batches.
In the next section, you create a state machine and orchestrate Spark applications using AWS Step Functions.
Using AWS Step Functions to create a Spark job workflow
AWS Step Functions automatically triggers and tracks each step and retries when it encounters errors. So your application executes in order and as expected every time. To create a Spark job workflow using AWS Step Functions, you first create a Lambda state machine using different types of states to create the entire workflow.
First, you use the Task state—a simple state in AWS Step Functions that performs a single unit of work. You also use the Wait state to delay the state machine from continuing for a specified time. Later, you use the Choice state to add branching logic to a state machine.
The following is a quick summary of how to use different states in the state machine to create the Spark ETL pipeline:
Task state – Invokes a Lambda function. The first Task state submits the Spark job on Amazon EMR, and the next Task state is used to retrieve the previous Spark job status.
Wait state – Pauses the state machine until a job completes execution.
Choice state – Each Spark job execution can return a failure, an error, or a success state So, in the state machine, you use the Choice state to create a rule that specifies the next action or step based on the success or failure of the previous step.
Here is one of my Task states, MilesPerRateCode, which simply submits a Spark job:
"MilesPerRate Job": {
"Type": "Task",
"Resource":"arn:aws:lambda:us-east-1:xxxxxx:function:blog-miles-per-rate-job-submit-function",
"ResultPath": "$.jobId",
"Next": "Wait for MilesPerRate job to complete"
}
This Task state configuration specifies the Lambda function to execute. Inside the Lambda function, it submits a Spark job through Livy using Livy’s POST API. Using ResultPath, it tells the state machine where to place the result of the executing task. As discussed in the previous section, Spark submit returns the session ID, which is captured with $.jobId and used in a later state.
The following code section shows the Lambda function, which is used to submit the MilesPerRateCode job. It uses the Python request library to submit a POST against the Livy endpoint hosted on Amazon EMR and passes the required parameters in JSON format through payload. It then parses the response, grabs id from the response, and returns it. The Next field tells the state machine which state to go to next.
Just like in the MilesPerRate job, another state submits the RateCodeStatus job, but it executes only when all previous jobs have completed successfully.
Here is the Task state in the state machine that checks the Spark job status:
Just like other states, the preceding Task executes a Lambda function, captures the result (represented by jobStatus), and passes it to the next state. The following is the Lambda function that checks the Spark job status based on a given session ID:
In the Choice state, it checks the Spark job status value, compares it with a predefined state status, and transitions the state based on the result. For example, if the status is success, move to the next state (RateCodeJobStatus job), and if it is dead, move to the MilesPerRate job failed state.
To set up this entire solution, you need to create a few AWS resources. To make it easier, I have created an AWS CloudFormation template. This template creates all the required AWS resources and configures all the resources that are needed to create a Spark-based ETL pipeline on AWS Step Functions.
This CloudFormation template requires you to pass the following four parameters during initiation.
Parameter
Description
ClusterSubnetID
The subnet where the Amazon EMR cluster is deployed and Lambda is configured to talk to this subnet.
KeyName
The name of the existing EC2 key pair to access the Amazon EMR cluster.
VPCID
The ID of the virtual private cloud (VPC) where the EMR cluster is deployed and Lambda is configured to talk to this VPC.
S3RootPath
The Amazon S3 path where all required files (input file, Spark job, and so on) are stored and the resulting data is written.
IMPORTANT: These templates are designed only to show how you can create a Spark-based ETL pipeline on AWS Step Functions using Apache Livy. They are not intended for production use without modification. And if you try this solution outside of the us-east-1 Region, download the necessary files from s3://aws-data-analytics-blog/emr-step-functions, upload the files to the buckets in your Region, edit the script as appropriate, and then run it.
To launch the CloudFormation stack, choose Launch Stack:
Launching this stack creates the following list of AWS resources.
Logical ID
Resource Type
Description
StepFunctionsStateExecutionRole
IAM role
IAM role to execute the state machine and have a trust relationship with the states service.
SparkETLStateMachine
AWS Step Functions state machine
State machine in AWS Step Functions for the Spark ETL workflow.
LambdaSecurityGroup
Amazon EC2 security group
Security group that is used for the Lambda function to call the Livy API.
RateCodeStatusJobSubmitFunction
AWS Lambda function
Lambda function to submit the RateCodeStatus job.
MilesPerRateJobSubmitFunction
AWS Lambda function
Lambda function to submit the MilesPerRate job.
SparkJobStatusFunction
AWS Lambda function
Lambda function to check the Spark job status.
LambdaStateMachineRole
IAM role
IAM role for all Lambda functions to use the lambda trust relationship.
EMRCluster
Amazon EMR cluster
EMR cluster where Livy is running and where the job is placed.
During the AWS CloudFormation deployment phase, it sets up S3 paths for input and output. Input files are stored in the <<s3-root-path>>/emr-step-functions/input/ path, whereas spark-taxi.jar is copied under <<s3-root-path>>/emr-step-functions/.
The following screenshot shows how the S3 paths are configured after deployment. In this example, I passed a bucket that I created in the AWS account s3://tm-app-demos for the S3 root path.
If the CloudFormation template completed successfully, you will see Spark-ETL-State-Machine in the AWS Step Functions dashboard, as follows:
Choose the Spark-ETL-State-Machine state machine to take a look at this implementation. The AWS CloudFormation template built the entire state machine along with its dependent Lambda functions, which are now ready to be executed.
On the dashboard, choose the newly created state machine, and then choose New execution to initiate the state machine. It asks you to pass input in JSON format. This input goes to the first state MilesPerRate Job, which eventually executes the Lambda function blog-miles-per-rate-job-submit-function.
Pass the S3 root path as input:
{
“rootPath”: “s3://tm-app-demos”
}
Then choose Start Execution:
The rootPath value is the same value that was passed when creating the CloudFormation stack. It can be an S3 bucket location or a bucket with prefixes, but it should be the same value that is used for AWS CloudFormation. This value tells the state machine where it can find the Spark jar and input file, and where it will write output files. After the state machine starts, each state/task is executed based on its definition in the state machine.
At a high level, the following represents the flow of events:
Execute the first Spark job, MilesPerRate.
The Spark job reads the input file from the location <<rootPath>>/emr-step-functions/input/tripdata.csv. If the job finishes successfully, it writes the output data to <<rootPath>>/emr-step-functions/miles-per-rate.
If the Spark job fails, it transitions to the error state MilesPerRate job failed, and the state machine stops. If the Spark job finishes successfully, it transitions to the RateCodeStatus Job state, and the second Spark job is executed.
If the second Spark job fails, it transitions to the error state RateCodeStatus job failed, and the state machine stops with the Failed status.
If this Spark job completes successfully, it writes the final output data to the <<rootPath>>/emr-step-functions/rate-code-status/ It also transitions the RateCodeStatus job finished state, and the state machine ends its execution with the Success status.
This following screenshot shows a successfully completed Spark ETL state machine:
The right side of the state machine diagram shows the details of individual states with their input and output.
When you execute the state machine for the second time, it fails because the S3 path already exists. The state machine turns red and stops at MilePerRate job failed. The following image represents that failed execution of the state machine:
You can also check your Spark application status and logs by going to the Amazon EMR console and viewing the Application history tab:
I hope this walkthrough paints a picture of how you can create a serverless solution for orchestrating Spark jobs on Amazon EMR using AWS Step Functions and Apache Livy. In the next section, I share some ideas for making this solution even more elegant.
Next steps
The goal of this post is to show a simple example that uses AWS Step Functions to create an orchestration for Spark-based jobs in a serverless fashion. To make this solution robust and production ready, you can explore the following options:
In this example, I manually initiated the state machine by passing the rootPath as input. You can instead trigger the state machine automatically. To run the ETL pipeline as soon as the files arrive in your S3 bucket, you can pass the new file path to the state machine. Because CloudWatch Events supports AWS Step Functions as a target, you can create a CloudWatch rule for an S3 event. You can then set AWS Step Functions as a target and pass the new file path to your state machine. You’re all set!
You can also improve this solution by adding an alerting mechanism in case of failures. To do this, create a Lambda function that sends an alert email and assigns that Lambda function to a Fail That way, when any part of your state fails, it triggers an email and notifies the user.
If you want to submit multiple Spark jobs in parallel, you can use the Parallel state type in AWS Step Functions. The Parallel state is used to create parallel branches of execution in your state machine.
With Lambda and AWS Step Functions, you can create a very robust serverless orchestration for your big data workload.
Cleaning up
When you’ve finished testing this solution, remember to clean up all those AWS resources that you created using AWS CloudFormation. Use the AWS CloudFormation console or AWS CLI to delete the stack named Blog-Spark-ETL-Step-Functions.
Summary
In this post, I showed you how to use AWS Step Functions to orchestrate your Spark jobs that are running on Amazon EMR. You used Apache Livy to submit jobs to Spark from a Lambda function and created a workflow for your Spark jobs, maintaining a specific order for job execution and triggering different AWS events based on your job’s outcome. Go ahead—give this solution a try, and share your experience with us!
Tanzir Musabbir is an EMR Specialist Solutions Architect with AWS. He is an early adopter of open source Big Data technologies. At AWS, he works with our customers to provide them architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena & AWS Glue. Tanzir is a big Real Madrid fan and he loves to travel in his free time.
Slack is widely used by DevOps and development teams to communicate status. Typically, when a build has been tested and is ready to be promoted to a staging environment, a QA engineer or DevOps engineer kicks off the deployment. Using Slack in a ChatOps collaboration model, the promotion can be done in a single click from a Slack channel. And because the promotion happens through a Slack channel, the whole development team knows what’s happening without checking email.
In this blog post, I will show you how to integrate AWS services with a Slack application. I use an interactive message button and incoming webhook to promote a stage with a single click.
To follow along with the steps in this post, you’ll need a pipeline in AWS CodePipeline. If you don’t have a pipeline, the fastest way to create one for this use case is to use AWS CodeStar. Go to the AWS CodeStar console and select the Static Website template (shown in the screenshot). AWS CodeStar will create a pipeline with an AWS CodeCommit repository and an AWS CodeDeploy deployment for you. After the pipeline is created, you will need to add a manual approval stage.
You’ll also need to build a Slack app with webhooks and interactive components, write two Lambda functions, and create an API Gateway API and a SNS topic.
As you’ll see in the following diagram, when I make a change and merge a new feature into the master branch in AWS CodeCommit, the check-in kicks off my CI/CD pipeline in AWS CodePipeline. When CodePipeline reaches the approval stage, it sends a notification to Amazon SNS, which triggers an AWS Lambda function (ApprovalRequester).
The Slack channel receives a prompt that looks like the following screenshot. When I click Yes to approve the build promotion, the approval result is sent to CodePipeline through API Gateway and Lambda (ApprovalHandler). The pipeline continues on to deploy the build to the next environment.
Create a Slack app
For App Name, type a name for your app. For Development Slack Workspace, choose the name of your workspace. You’ll see in the following screenshot that my workspace is AWS ChatOps.
After the Slack application has been created, you will see the Basic Information page, where you can create incoming webhooks and enable interactive components.
To add incoming webhooks:
Under Add features and functionality, choose Incoming Webhooks. Turn the feature on by selecting Off, as shown in the following screenshot.
Now that the feature is turned on, choose Add New Webhook to Workspace. In the process of creating the webhook, Slack lets you choose the channel where messages will be posted.
After the webhook has been created, you’ll see its URL. You will use this URL when you create the Lambda function.
If you followed the steps in the post, the pipeline should look like the following.
Write the Lambda function for approval requests
This Lambda function is invoked by the SNS notification. It sends a request that consists of an interactive message button to the incoming webhook you created earlier. The following sample code sends the request to the incoming webhook. WEBHOOK_URL and SLACK_CHANNEL are the environment variables that hold values of the webhook URL that you created and the Slack channel where you want the interactive message button to appear.
# This function is invoked via SNS when the CodePipeline manual approval action starts.
# It will take the details from this approval notification and sent an interactive message to Slack that allows users to approve or cancel the deployment.
import os
import json
import logging
import urllib.parse
from base64 import b64decode
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
# This is passed as a plain-text environment variable for ease of demonstration.
# Consider encrypting the value with KMS or use an encrypted parameter in Parameter Store for production deployments.
SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL']
SLACK_CHANNEL = os.environ['SLACK_CHANNEL']
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
message = event["Records"][0]["Sns"]["Message"]
data = json.loads(message)
token = data["approval"]["token"]
codepipeline_name = data["approval"]["pipelineName"]
slack_message = {
"channel": SLACK_CHANNEL,
"text": "Would you like to promote the build to production?",
"attachments": [
{
"text": "Yes to deploy your build to production",
"fallback": "You are unable to promote a build",
"callback_id": "wopr_game",
"color": "#3AA3E3",
"attachment_type": "default",
"actions": [
{
"name": "deployment",
"text": "Yes",
"style": "danger",
"type": "button",
"value": json.dumps({"approve": True, "codePipelineToken": token, "codePipelineName": codepipeline_name}),
"confirm": {
"title": "Are you sure?",
"text": "This will deploy the build to production",
"ok_text": "Yes",
"dismiss_text": "No"
}
},
{
"name": "deployment",
"text": "No",
"type": "button",
"value": json.dumps({"approve": False, "codePipelineToken": token, "codePipelineName": codepipeline_name})
}
]
}
]
}
req = Request(SLACK_WEBHOOK_URL, json.dumps(slack_message).encode('utf-8'))
response = urlopen(req)
response.read()
return None
Create a SNS topic
Create a topic and then create a subscription that invokes the ApprovalRequester Lambda function. You can configure the manual approval action in the pipeline to send a message to this SNS topic when an approval action is required. When the pipeline reaches the approval stage, it sends a notification to this SNS topic. SNS publishes a notification to all of the subscribed endpoints. In this case, the Lambda function is the endpoint. Therefore, it invokes and executes the Lambda function. For information about how to create a SNS topic, see Create a Topic in the Amazon SNS Developer Guide.
Write the Lambda function for handling the interactive message button
This Lambda function is invoked by API Gateway. It receives the result of the interactive message button whether or not the build promotion was approved. If approved, an API call is made to CodePipeline to promote the build to the next environment. If not approved, the pipeline stops and does not move to the next stage.
The Lambda function code might look like the following. SLACK_VERIFICATION_TOKEN is the environment variable that contains your Slack verification token. You can find your verification token under Basic Information on Slack manage app page. When you scroll down, you will see App Credential. Verification token is found under the section.
# This function is triggered via API Gateway when a user acts on the Slack interactive message sent by approval_requester.py.
from urllib.parse import parse_qs
import json
import os
import boto3
SLACK_VERIFICATION_TOKEN = os.environ['SLACK_VERIFICATION_TOKEN']
#Triggered by API Gateway
#It kicks off a particular CodePipeline project
def lambda_handler(event, context):
#print("Received event: " + json.dumps(event, indent=2))
body = parse_qs(event['body'])
payload = json.loads(body['payload'][0])
# Validate Slack token
if SLACK_VERIFICATION_TOKEN == payload['token']:
send_slack_message(json.loads(payload['actions'][0]['value']))
# This will replace the interactive message with a simple text response.
# You can implement a more complex message update if you would like.
return {
"isBase64Encoded": "false",
"statusCode": 200,
"body": "{\"text\": \"The approval has been processed\"}"
}
else:
return {
"isBase64Encoded": "false",
"statusCode": 403,
"body": "{\"error\": \"This request does not include a vailid verification token.\"}"
}
def send_slack_message(action_details):
codepipeline_status = "Approved" if action_details["approve"] else "Rejected"
codepipeline_name = action_details["codePipelineName"]
token = action_details["codePipelineToken"]
client = boto3.client('codepipeline')
response_approval = client.put_approval_result(
pipelineName=codepipeline_name,
stageName='Approval',
actionName='ApprovalOrDeny',
result={'summary':'','status':codepipeline_status},
token=token)
print(response_approval)
Create the API Gateway API
In the Amazon API Gateway console, create a resource called InteractiveMessageHandler.
Create a POST method.
For Integration type, choose Lambda Function.
Select Use Lambda Proxy integration.
From Lambda Region, choose a region.
In Lambda Function, type a name for your function.
Now go back to your Slack application and enable interactive components.
To enable interactive components for the interactive message (Yes) button:
Under Features, choose Interactive Components.
Choose Enable Interactive Components.
Type a request URL in the text box. Use the invoke URL in Amazon API Gateway that will be called when the approval button is clicked.
Now that all the pieces have been created, run the solution by checking in a code change to your CodeCommit repo. That will release the change through CodePipeline. When the CodePipeline comes to the approval stage, it will prompt to your Slack channel to see if you want to promote the build to your staging or production environment. Choose Yes and then see if your change was deployed to the environment.
Conclusion
That is it! You have now created a Slack ChatOps solution using AWS CodeCommit, AWS CodePipeline, AWS Lambda, Amazon API Gateway, and Amazon Simple Notification Service.
Now that you know how to do this Slack and CodePipeline integration, you can use the same method to interact with other AWS services using API Gateway and Lambda. You can also use Slack’s slash command to initiate an action from a Slack channel, rather than responding in the way demonstrated in this post.
We announced a preview of AWS IoT 1-Click at AWS re:Invent 2017 and have been refining it ever since, focusing on simplicity and a clean out-of-box experience. Designed to make IoT available and accessible to a broad audience, AWS IoT 1-Click is now generally available, along with new IoT buttons from AWS and AT&T.
I sat down with the dev team a month or two ago to learn about the service so that I could start thinking about my blog post. During the meeting they gave me a pair of IoT buttons and I started to think about some creative ways to put them to use. Here are a few that I came up with:
Help Request – Earlier this month I spent a very pleasant weekend at the HackTillDawn hackathon in Los Angeles. As the participants were hacking away, they occasionally had questions about AWS, machine learning, Amazon SageMaker, and AWS DeepLens. While we had plenty of AWS Solution Architects on hand (decked out in fashionable & distinctive AWS shirts for easy identification), I imagined an IoT button for each team. Pressing the button would alert the SA crew via SMS and direct them to the proper table.
Camera Control – Tim Bray and I were in the AWS video studio, prepping for the first episode of Tim’s series on AWS Messaging. Minutes before we opened the Twitch stream I realized that we did not have a clean, unobtrusive way to ask the camera operator to switch to a closeup view. Again, I imagined that a couple of IoT buttons would allow us to make the request.
Remote Dog Treat Dispenser – My dog barks every time a stranger opens the gate in front of our house. While it is great to have confirmation that my Ring doorbell is working, I would like to be able to press a button and dispense a treat so that Luna stops barking!
Homes, offices, factories, schools, vehicles, and health care facilities can all benefit from IoT buttons and other simple IoT devices, all managed using AWS IoT 1-Click.
All About AWS IoT 1-Click As I said earlier, we have been focusing on simplicity and a clean out-of-box experience. Here’s what that means:
Architects can dream up applications for inexpensive, low-powered devices.
Developers don’t need to write any device-level code. They can make use of pre-built actions, which send email or SMS messages, or write their own custom actions using AWS Lambda functions.
Installers don’t have to install certificates or configure cloud endpoints on newly acquired devices, and don’t have to worry about firmware updates.
Administrators can monitor the overall status and health of each device, and can arrange to receive alerts when a device nears the end of its useful life and needs to be replaced, using a single interface that spans device types and manufacturers.
I’ll show you how easy this is in just a moment. But first, let’s talk about the current set of devices that are supported by AWS IoT 1-Click.
Who’s Got the Button? We’re launching with support for two types of buttons (both pictured above). Both types of buttons are pre-configured with X.509 certificates, communicate to the cloud over secure connections, and are ready to use.
The AWS IoT Enterprise Button communicates via Wi-Fi. It has a 2000-click lifetime, encrypts outbound data using TLS, and can be configured using BLE and our mobile app. It retails for $19.99 (shipping and handling not included) and can be used in the United States, Europe, and Japan.
The AT&T LTE-M Button communicates via the LTE-M cellular network. It has a 1500-click lifetime, and also encrypts outbound data using TLS. The device and the bundled data plan is available an an introductory price of $29.99 (shipping and handling not included), and can be used in the United States.
We are very interested in working with device manufacturers in order to make even more shapes, sizes, and types of devices (badge readers, asset trackers, motion detectors, and industrial sensors, to name a few) available to our customers. Our team will be happy to tell you about our provisioning tools and our facility for pushing OTA (over the air) updates to large fleets of devices; you can contact them at [email protected].
AWS IoT 1-Click Concepts I’m eager to show you how to use AWS IoT 1-Click and the buttons, but need to introduce a few concepts first.
Device – A button or other item that can send messages. Each device is uniquely identified by a serial number.
Placement Template – Describes a like-minded collection of devices to be deployed. Specifies the action to be performed and lists the names of custom attributes for each device.
Placement – A device that has been deployed. Referring to placements instead of devices gives you the freedom to replace and upgrade devices with minimal disruption. Each placement can include values for custom attributes such as a location (“Building 8, 3rd Floor, Room 1337”) or a purpose (“Coffee Request Button”).
Action – The AWS Lambda function to invoke when the button is pressed. You can write a function from scratch, or you can make use of a pair of predefined functions that send an email or an SMS message. The actions have access to the attributes; you can, for example, send an SMS message with the text “Urgent need for coffee in Building 8, 3rd Floor, Room 1337.”
Getting Started with AWS IoT 1-Click Let’s set up an IoT button using the AWS IoT 1-Click Console:
If I didn’t have any buttons I could click Buy devices to get some. But, I do have some, so I click Claim devices to move ahead. I enter the device ID or claim code for my AT&T button and click Claim (I can enter multiple claim codes or device IDs if I want):
The AWS buttons can be claimed using the console or the mobile app; the first step is to use the mobile app to configure the button to use my Wi-Fi:
Then I scan the barcode on the box and click the button to complete the process of claiming the device. Both of my buttons are now visible in the console:
I am now ready to put them to use. I click on Projects, and then Create a project:
I name and describe my project, and click Next to proceed:
Now I define a device template, along with names and default values for the placement attributes. Here’s how I set up a device template (projects can contain several, but I just need one):
The action has two mandatory parameters (phone number and SMS message) built in; I add three more (Building, Room, and Floor) and click Create project:
I’m almost ready to ask for some coffee! The next step is to associate my buttons with this project by creating a placement for each one. I click Create placements to proceed. I name each placement, select the device to associate with it, and then enter values for the attributes that I established for the project. I can also add additional attributes that are peculiar to this placement:
I can inspect my project and see that everything looks good:
I click on the buttons and the SMS messages appear:
I can monitor device activity in the AWS IoT 1-Click Console:
And also in the Lambda Console:
The Lambda function itself is also accessible, and can be used as-is or customized:
As you can see, this is the code that lets me use {{*}}include all of the placement attributes in the message and {{Building}} (for example) to include a specific placement attribute.
Now Available I’ve barely scratched the surface of this cool new service and I encourage you to give it a try (or a click) yourself. Buy a button or two, build something cool, and let me know all about it!
Pricing is based on the number of enabled devices in your account, measured monthly and pro-rated for partial months. Devices can be enabled or disabled at any time. See the AWS IoT 1-Click Pricing page for more info.
I’m happy to announce that Sumerian is now generally available. You can create realistic virtual environments and scenes without having to acquire or master specialized tools for 3D modeling, animation, lighting, audio editing, or programming. Once built, you can deploy your finished creation across multiple platforms without having to write custom code or deal with specialized deployment systems and processes.
Sumerian gives you a web-based editor that you can use to quickly and easily create realistic, professional-quality scenes. There’s a visual scripting tool that lets you build logic to control how objects and characters (Sumerian Hosts) respond to user actions. Sumerian also lets you create rich, natural interactions powered by AWS services such as Amazon Lex, Polly, AWS Lambda, AWS IoT, and Amazon DynamoDB.
Sumerian was designed to work on multiple platforms. The VR and AR apps that you create in Sumerian will run in browsers that supports WebGL or WebVR and on popular devices such as the Oculus Rift, HTC Vive, and those powered by iOS or Android.
During the preview period, we have been working with a broad spectrum of customers to put Sumerian to the test and to create proof of concept (PoC) projects designed to highlight an equally broad spectrum of use cases, including employee education, training simulations, field service productivity, virtual concierge, design and creative, and brand engagement. Fidelity Labs (the internal R&D unit of Fidelity Investments), was the first to use a Sumerian host to create an engaging VR experience. Cora (the host) lives within a virtual chart room. She can display stock quotes, pull up company charts, and answer questions about a company’s performance. This PoC uses Amazon Polly to implement text to speech and Amazon Lex for conversational chatbot functionality. Read their blog post and watch the video inside to see Cora in action:
Now that Sumerian is generally available, you have the power to create engaging AR, VR, and 3D experiences of your own. To learn more, visit the Amazon Sumerian home page and then spend some quality time with our extensive collection of Sumerian Tutorials.
As a serverless computing platform that supports Java 8 runtime, AWS Lambda makes it easy to run any type of Java function simply by uploading a JAR file. To help define not only a Lambda serverless application but also Amazon API Gateway, Amazon DynamoDB, and other related services, the AWS Serverless Application Model (SAM) allows developers to use a simple AWS CloudFormation template.
AWS provides the AWS Toolkit for Eclipse that supports both Lambda and SAM. AWS also gives customers an easy way to create Lambda functions and SAM applications in Java using the AWS Command Line Interface (AWS CLI). After you build a JAR file, all you have to do is type the following commands:
To consolidate these steps, customers can use Archetype by Apache Maven. Archetype uses a predefined package template that makes getting started to develop a function exceptionally simple.
In this post, I introduce a Maven archetype that allows you to create a skeleton of AWS SAM for a Java function. Using this archetype, you can generate a sample Java code example and an accompanying SAM template to deploy it on AWS Lambda by a single Maven action.
Prerequisites
Make sure that the following software is installed on your workstation:
Java
Maven
AWS CLI
(Optional) AWS SAM CLI
Install Archetype
After you’ve set up those packages, install Archetype with the following commands:
git clone https://github.com/awslabs/aws-serverless-java-archetype
cd aws-serverless-java-archetype
mvn install
These are one-time operations, so you don’t run them for every new package. If you’d like, you can add Archetype to your company’s Maven repository so that other developers can use it later.
With those packages installed, you’re ready to develop your new Lambda Function.
Start a project
Now that you have the archetype, customize it and run the code:
cd /path/to/project_home
mvn archetype:generate \
-DarchetypeGroupId=com.amazonaws.serverless.archetypes \
-DarchetypeArtifactId=aws-serverless-java-archetype \
-DarchetypeVersion=1.0.0 \
-DarchetypeRepository=local \ # Forcing to use local maven repository
-DinteractiveMode=false \ # For batch mode
# You can also specify properties below interactively if you omit the line for batch mode
-DgroupId=YOUR_GROUP_ID \
-DartifactId=YOUR_ARTIFACT_ID \
-Dversion=YOUR_VERSION \
-DclassName=YOUR_CLASSNAME
You should have a directory called YOUR_ARTIFACT_ID that contains the files and folders shown below:
The sample code is a working example. If you install SAM CLI, you can invoke it just by the command below:
cd YOUR_ARTIFACT_ID
mvn -P invoke verify
[INFO] Scanning for projects...
[INFO]
[INFO] ---------------------------< com.riywo:foo >----------------------------
[INFO] Building foo 1.0
[INFO] --------------------------------[ jar ]---------------------------------
...
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ foo ---
[INFO] Building jar: /private/tmp/foo/target/foo-1.0.jar
[INFO]
[INFO] --- maven-shade-plugin:3.1.0:shade (shade) @ foo ---
[INFO] Including com.amazonaws:aws-lambda-java-core:jar:1.2.0 in the shaded jar.
[INFO] Replacing /private/tmp/foo/target/lambda.jar with /private/tmp/foo/target/foo-1.0-shaded.jar
[INFO]
[INFO] --- exec-maven-plugin:1.6.0:exec (sam-local-invoke) @ foo ---
2018/04/06 16:34:35 Successfully parsed template.yaml
2018/04/06 16:34:35 Connected to Docker 1.37
2018/04/06 16:34:35 Fetching lambci/lambda:java8 image for java8 runtime...
java8: Pulling from lambci/lambda
Digest: sha256:14df0a5914d000e15753d739612a506ddb8fa89eaa28dcceff5497d9df2cf7aa
Status: Image is up to date for lambci/lambda:java8
2018/04/06 16:34:37 Invoking Package.Example::handleRequest (java8)
2018/04/06 16:34:37 Decompressing /tmp/foo/target/lambda.jar
2018/04/06 16:34:37 Mounting /private/var/folders/x5/ldp7c38545v9x5dg_zmkr5kxmpdprx/T/aws-sam-local-1523000077594231063 as /var/task:ro inside runtime container
START RequestId: a6ae19fe-b1b0-41e2-80bc-68a40d094d74 Version: $LATEST
Log output: Greeting is 'Hello Tim Wagner.'
END RequestId: a6ae19fe-b1b0-41e2-80bc-68a40d094d74
REPORT RequestId: a6ae19fe-b1b0-41e2-80bc-68a40d094d74 Duration: 96.60 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 7 MB
{"greetings":"Hello Tim Wagner."}
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 10.452 s
[INFO] Finished at: 2018-04-06T16:34:40+09:00
[INFO] ------------------------------------------------------------------------
This maven goal invokes sam local invoke -e event.json, so you can see the sample output to greet Tim Wagner.
To deploy this application to AWS, you need an Amazon S3 bucket to upload your package. You can use the following command to create a bucket if you want:
aws s3 mb s3://YOUR_BUCKET --region YOUR_REGION
Now, you can deploy your application by just one command!
mvn deploy \
-DawsRegion=YOUR_REGION \
-Ds3Bucket=YOUR_BUCKET \
-DstackName=YOUR_STACK
[INFO] Scanning for projects...
[INFO]
[INFO] ---------------------------< com.riywo:foo >----------------------------
[INFO] Building foo 1.0
[INFO] --------------------------------[ jar ]---------------------------------
...
[INFO] --- exec-maven-plugin:1.6.0:exec (sam-package) @ foo ---
Uploading to aws-serverless-java/com.riywo:foo:1.0/924732f1f8e4705c87e26ef77b080b47 11657 / 11657.0 (100.00%)
Successfully packaged artifacts and wrote output template to file target/sam.yaml.
Execute the following command to deploy the packaged template
aws cloudformation deploy --template-file /private/tmp/foo/target/sam.yaml --stack-name <YOUR STACK NAME>
[INFO]
[INFO] --- maven-deploy-plugin:2.8.2:deploy (default-deploy) @ foo ---
[INFO] Skipping artifact deployment
[INFO]
[INFO] --- exec-maven-plugin:1.6.0:exec (sam-deploy) @ foo ---
Waiting for changeset to be created..
Waiting for stack create/update to complete
Successfully created/updated stack - archetype
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 37.176 s
[INFO] Finished at: 2018-04-06T16:41:02+09:00
[INFO] ------------------------------------------------------------------------
Maven automatically creates a shaded JAR file, uploads it to your S3 bucket, replaces template.yaml, and creates and updates the CloudFormation stack.
To customize the process, modify the pom.xml file. For example, to avoid typing values for awsRegion, s3Bucket or stackName, write them inside pom.xml and check in your VCS. Afterward, you and the rest of your team can deploy the function by typing just the following command:
mvn deploy
Options
Lambda Java 8 runtime has some types of handlers: POJO, Simple type and Stream. The default option of this archetype is POJO style, which requires to create request and response classes, but they are baked by the archetype by default. If you want to use other type of handlers, you can use handlerType property like below:
## POJO type (default)
mvn archetype:generate \
...
-DhandlerType=pojo
## Simple type - String
mvn archetype:generate \
...
-DhandlerType=simple
### Stream type
mvn archetype:generate \
...
-DhandlerType=stream
Also, Lambda Java 8 runtime supports two types of Logging class: Log4j 2 and LambdaLogger. This archetype creates LambdaLogger implementation by default, but you can use Log4j 2 if you want:
If you use LambdaLogger, you can delete ./src/main/resources/log4j2.xml. See documentation for more details.
Conclusion
So, what’s next? Develop your Lambda function locally and type the following command: mvn deploy !
With this Archetype code example, available on GitHub repo, you should be able to deploy Lambda functions for Java 8 in a snap. If you have any questions or comments, please submit them below or leave them on GitHub.
If you’ve used Amazon CloudWatch Events to schedule the invocation of a Lambda function at regular intervals, you may have noticed that the highest frequency possible is one invocation per minute. However, in some cases, you may need to invoke Lambda more often than that. In this blog post, I’ll cover invoking a Lambda function every 10 seconds, but with some simple math you can change to whatever interval you like.
To achieve this, I’ll show you how to leverage Step Functions and Amazon Kinesis Data Streams.
The Solution
For this example, I’ve created a Step Functions State Machine that invokes our Lambda function 6 times, 10 seconds apart. Such State Machine is then executed once per minute by a CloudWatch Events Rule. This state machine is then executed once per minute by an Amazon CloudWatch Events rule. Finally, the Kinesis Data Stream triggers our Lambda function for each record inserted. The result is our Lambda function being invoked every 10 seconds, indefinitely.
Below is a diagram illustrating how the various services work together.
Step 1: My sampleLambda function doesn’t actually do anything, it just simulates an execution for a few seconds. This is the (Python) code of my dummy function:
import time
import random
def lambda_handler(event, context):
rand = random.randint(1, 3)
print('Running for {} seconds'.format(rand))
time.sleep(rand)
return True
Step 2:
The next step is to create a second Lambda function, that I called Iterator, which has two duties:
It keeps track of the current number of iterations, since Step Function doesn’t natively have a state we can use for this purpose.
It asynchronously invokes our Lambda function at every loops.
This is the code of the Iterator, adapted from here.
The state machine starts and sets the index at 0 and the count at 6.
Iterator function is invoked.
If the iterator function reached the end of the loop, the IsCountReached state terminates the execution, otherwise the machine waits for 10 seconds.
The machine loops back to the iterator.
Step 3: Create an Amazon CloudWatch Events rule scheduled to trigger every minute and add the state machine as its target. I’ve actually prepared an Amazon CloudFormation template that creates the whole stack and starts the Lambda invocations, you can find it here.
Performance
Let’s have a look at a sample series of invocations and analyse how precise the timing is. In the following chart I reported the delay (in excess of the expected 10-second-wait) of 30 consecutive invocations of my dummy function, when the Iterator is configured with a memory size of 1024MB.
Invocations Delay
Notice the delay increases by a few hundred milliseconds at every invocation. The good news is it accrues only within the same loop, 6 times; after that, a new CloudWatch Events kicks in and it resets.
This delay is due to the work that AWS Step Function does outside of the Wait state, the main component of which is the Iterator function itself, that runs synchronously in the state machine and therefore adds up its duration to the 10-second-wait.
As we can easily imagine, the memory size of the Iterator Lambda function does make a difference. Here are the Average and Maximum duration of the function with 256MB, 512MB, 1GB and 2GB of memory.
Average Duration
Maximum Duration
Given those results, I’d say that a memory of 1024MB is a good compromise between costs and performance.
Caveats
As mentioned, in our Amazon CloudWatch Events documentation, in rare cases a rule can be triggered twice, causing two parallel executions of the state machine. If that is a concern, we can add a task state at the beginning of the state machine that checks if any other executions are currently running. If the outcome is positive, then a choice state can immediately terminate the flow. Since the state machine is invoked every 60 seconds and runs for about 50, it is safe to assume that executions should all be sequential and any parallel executions should be treated as duplicates. The task state that checks for current running executions can be a Lambda function similar to the following:
By continuing to use the site, you agree to the use of cookies. more information
The cookie settings on this website are set to "allow cookies" to give you the best browsing experience possible. If you continue to use this website without changing your cookie settings or you click "Accept" below then you are consenting to this.