Today, we’re excited to announce local build support in AWS CodeBuild.
AWS CodeBuild is a fully managed build service. There are no servers to provision and scale, or software to install, configure, and operate. You just specify the location of your source code, choose your build settings, and CodeBuild runs build scripts for compiling, testing, and packaging your code.
In this blog post, I’ll show you how to set up CodeBuild locally to build and test a sample Java application.
By building an application on a local machine you can:
Test the integrity and contents of a buildspec file locally.
Test and build an application locally before committing.
Identify and fix errors quickly from your local development environment.
Prerequisites
In this post, I am using AWS Cloud9 IDE as my development environment.
If you would like to use AWS Cloud9 as your IDE, follow the express setup steps in the AWS Cloud9 User Guide.
The AWS Cloud9 IDE comes with Docker and Git already installed. If you are going to use your laptop or desktop machine as your development environment, install Docker and Git before you start.
Note: We need to provide three environment variables namely IMAGE_NAME, SOURCE and ARTIFACTS.
IMAGE_NAME: The name of your build environment image.
SOURCE: The absolute path to your source code directory.
ARTIFACTS: The absolute path to your artifact output folder.
When you run the sample project, you get a runtime error that says the YAML file does not exist. This is because a buildspec.yml file is not included in the sample web project. AWS CodeBuild requires a buildspec.yml to run a build. For more information about buildspec.yml, see Build Spec Example in the AWS CodeBuild User Guide.
Let’s add a buildspec.yml file with the following content to the sample-web-app folder and then rebuild the project.
version: 0.2
phases:
build:
commands:
- echo Build started on `date`
- mvn install
artifacts:
files:
- target/javawebdemo.war
This time your build should be successful. Upon successful execution, look in the /artifacts folder for the final built artifacts.zip file to validate.
Conclusion:
In this blog post, I showed you how to quickly set up the CodeBuild local agent to build projects right from your local desktop machine or laptop. As you see, local builds can improve developer productivity by helping you identify and fix errors quickly.
I hope you found this post useful. Feel free to leave your feedback or suggestions in the comments.
Many companies across the globe use Amazon DynamoDB to store and query historical user-interaction data. DynamoDB is a fast NoSQL database used by applications that need consistent, single-digit millisecond latency.
Often, customers want to turn their valuable data in DynamoDB into insights by analyzing a copy of their table stored in Amazon S3. Doing this separates their analytical queries from their low-latency critical paths. This data can be the primary source for understanding customers’ past behavior, predicting future behavior, and generating downstream business value. Customers often turn to DynamoDB because of its great scalability and high availability. After a successful launch, many customers want to use the data in DynamoDB to predict future behaviors or provide personalized recommendations.
DynamoDB is a good fit for low-latency reads and writes, but it’s not practical to scan all data in a DynamoDB database to train a model. In this post, I demonstrate how you can use DynamoDB table data copied to Amazon S3 by AWS Data Pipeline to predict customer behavior. I also demonstrate how you can use this data to provide personalized recommendations for customers using Amazon SageMaker. You can also run ad hoc queries using Amazon Athena against the data. DynamoDB recently released on-demand backups to create full table backups with no performance impact. However, it’s not suitable for our purposes in this post, so I chose AWS Data Pipeline instead to create managed backups are accessible from other services.
To do this, I describe how to read the DynamoDB backup file format in Data Pipeline. I also describe how to convert the objects in S3 to a CSV format that Amazon SageMaker can read. In addition, I show how to schedule regular exports and transformations using Data Pipeline. The sample data used in this post is from Bank Marketing Data Set of UCI.
The solution that I describe provides the following benefits:
Separates analytical queries from production traffic on your DynamoDB table, preserving your DynamoDB read capacity units (RCUs) for important production requests
Automatically updates your model to get real-time predictions
Optimizes for performance (so it doesn’t compete with DynamoDB RCUs after the export) and for cost (using data you already have)
Makes it easier for developers of all skill levels to use Amazon SageMaker
All code and data set in this post are available in this .zip file.
Solution architecture
The following diagram shows the overall architecture of the solution.
The steps that data follows through the architecture are as follows:
Data Pipeline regularly copies the full contents of a DynamoDB table as JSON into an S3
Exported JSON files are converted to comma-separated value (CSV) format to use as a data source for Amazon SageMaker.
Amazon SageMaker renews the model artifact and update the endpoint.
The converted CSV is available for ad hoc queries with Amazon Athena.
Data Pipeline controls this flow and repeats the cycle based on the schedule defined by customer requirements.
Building the auto-updating model
This section discusses details about how to read the DynamoDB exported data in Data Pipeline and build automated workflows for real-time prediction with a regularly updated model.
Find the automation_script.sh file and edit it for your environment. For example, you need to replace 's3://<your bucket>/<datasource path>/' with your own S3 path to the data source for Amazon ML. In the script, the text enclosed by angle brackets—< and >—should be replaced with your own path.
Upload the json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar file to your S3 path so that the ADD jar command in Apache Hive can refer to it.
For this solution, the banking.csv should be imported into a DynamoDB table.
Export a DynamoDB table
To export the DynamoDB table to S3, open the Data Pipeline console and choose the Export DynamoDB table to S3 template. In this template, Data Pipeline creates an Amazon EMR cluster and performs an export in the EMRActivity activity. Set proper intervals for backups according to your business requirements.
One core node(m3.xlarge) provides the default capacity for the EMR cluster and should be suitable for the solution in this post. Leave the option to resize the cluster before running enabled in the TableBackupActivity activity to let Data Pipeline scale the cluster to match the table size. The process of converting to CSV format and renewing models happens in this EMR cluster.
For a more in-depth look at how to export data from DynamoDB, see Export Data from DynamoDB in the Data Pipeline documentation.
Add the script to an existing pipeline
After you export your DynamoDB table, you add an additional EMR step to EMRActivity by following these steps:
Open the Data Pipeline console and choose the ID for the pipeline that you want to add the script to.
For Actions, choose Edit.
In the editing console, choose the Activities category and add an EMR step using the custom script downloaded in the previous section, as shown below.
Paste the following command into the new step after the data upload step:
The element #{output.directoryPath} references the S3 path where the data pipeline exports DynamoDB data as JSON. The path should be passed to the script as an argument.
The bash script has two goals, converting data formats and renewing the Amazon SageMaker model. Subsequent sections discuss the contents of the automation script.
Automation script: Convert JSON data to CSV with Hive
We use Apache Hive to transform the data into a new format. The Hive QL script to create an external table and transform the data is included in the custom script that you added to the Data Pipeline definition.
When you run the Hive scripts, do so with the -e option. Also, define the Hive table with the 'org.openx.data.jsonserde.JsonSerDe' row format to parse and read JSON format. The SQL creates a Hive EXTERNAL table, and it reads the DynamoDB backup data on the S3 path passed to it by Data Pipeline.
Note: You should create the table with the “EXTERNAL” keyword to avoid the backup data being accidentally deleted from S3 if you drop the table.
The full automation script for converting follows. Add your own bucket name and data source path in the highlighted areas.
After creating an external table, you need to read data. You then use the INSERT OVERWRITE DIRECTORY ~ SELECT command to write CSV data to the S3 path that you designated as the data source for Amazon SageMaker.
Depending on your requirements, you can eliminate or process the columns in the SELECT clause in this step to optimize data analysis. For example, you might remove some columns that have unpredictable correlations with the target value because keeping the wrong columns might expose your model to “overfitting” during the training. In this post, customer_id columns is removed. Overfitting can make your prediction weak. More information about overfitting can be found in the topic Model Fit: Underfitting vs. Overfitting in the Amazon ML documentation.
Automation script: Renew the Amazon SageMaker model
After the CSV data is replaced and ready to use, create a new model artifact for Amazon SageMaker with the updated dataset on S3. For renewing model artifact, you must create a new training job. Training jobs can be run using the AWS SDK ( for example, Amazon SageMaker boto3 ) or the Amazon SageMaker Python SDK that can be installed with “pip install sagemaker” command as well as the AWS CLI for Amazon SageMaker described in this post.
In addition, consider how to smoothly renew your existing model without service impact, because your model is called by applications in real time. To do this, you need to create a new endpoint configuration first and update a current endpoint with the endpoint configuration that is just created.
#!/bin/bash
## Define variable
REGION=$2
DTTIME=`date +%Y-%m-%d-%H-%M-%S`
ROLE="<your AmazonSageMaker-ExecutionRole>"
# Select containers image based on region.
case "$REGION" in
"us-west-2" )
IMAGE="174872318107.dkr.ecr.us-west-2.amazonaws.com/linear-learner:latest"
;;
"us-east-1" )
IMAGE="382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:latest"
;;
"us-east-2" )
IMAGE="404615174143.dkr.ecr.us-east-2.amazonaws.com/linear-learner:latest"
;;
"eu-west-1" )
IMAGE="438346466558.dkr.ecr.eu-west-1.amazonaws.com/linear-learner:latest"
;;
*)
echo "Invalid Region Name"
exit 1 ;
esac
# Start training job and creating model artifact
TRAINING_JOB_NAME=TRAIN-${DTTIME}
S3OUTPUT="s3://<your bucket name>/model/"
INSTANCETYPE="ml.m4.xlarge"
INSTANCECOUNT=1
VOLUMESIZE=5
aws sagemaker create-training-job --training-job-name ${TRAINING_JOB_NAME} --region ${REGION} --algorithm-specification TrainingImage=${IMAGE},TrainingInputMode=File --role-arn ${ROLE} --input-data-config '[{ "ChannelName": "train", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://<your bucket name>/<datasource path>/", "S3DataDistributionType": "FullyReplicated" } }, "ContentType": "text/csv", "CompressionType": "None" , "RecordWrapperType": "None" }]' --output-data-config S3OutputPath=${S3OUTPUT} --resource-config InstanceType=${INSTANCETYPE},InstanceCount=${INSTANCECOUNT},VolumeSizeInGB=${VOLUMESIZE} --stopping-condition MaxRuntimeInSeconds=120 --hyper-parameters feature_dim=20,predictor_type=binary_classifier
# Wait until job completed
aws sagemaker wait training-job-completed-or-stopped --training-job-name ${TRAINING_JOB_NAME} --region ${REGION}
# Get newly created model artifact and create model
MODELARTIFACT=`aws sagemaker describe-training-job --training-job-name ${TRAINING_JOB_NAME} --region ${REGION} --query 'ModelArtifacts.S3ModelArtifacts' --output text `
MODELNAME=MODEL-${DTTIME}
aws sagemaker create-model --region ${REGION} --model-name ${MODELNAME} --primary-container Image=${IMAGE},ModelDataUrl=${MODELARTIFACT} --execution-role-arn ${ROLE}
# create a new endpoint configuration
CONFIGNAME=CONFIG-${DTTIME}
aws sagemaker create-endpoint-config --region ${REGION} --endpoint-config-name ${CONFIGNAME} --production-variants VariantName=Users,ModelName=${MODELNAME},InitialInstanceCount=1,InstanceType=ml.m4.xlarge
# create or update the endpoint
STATUS=`aws sagemaker describe-endpoint --endpoint-name ServiceEndpoint --query 'EndpointStatus' --output text --region ${REGION} `
if [[ $STATUS -ne "InService" ]] ;
then
aws sagemaker create-endpoint --endpoint-name ServiceEndpoint --endpoint-config-name ${CONFIGNAME} --region ${REGION}
else
aws sagemaker update-endpoint --endpoint-name ServiceEndpoint --endpoint-config-name ${CONFIGNAME} --region ${REGION}
fi
Grant permission
Before you execute the script, you must grant proper permission to Data Pipeline. Data Pipeline uses the DataPipelineDefaultResourceRole role by default. I added the following policy to DataPipelineDefaultResourceRole to allow Data Pipeline to create, delete, and update the Amazon SageMaker model and data source in the script.
After you deploy a model into production using Amazon SageMaker hosting services, your client applications use this API to get inferences from the model hosted at the specified endpoint. This approach is useful for interactive web, mobile, or desktop applications.
Following, I provide a simple Python code example that queries against Amazon SageMaker endpoint URL with its name (“ServiceEndpoint”) and then uses them for real-time prediction.
Data Pipeline exports DynamoDB table data into S3. The original JSON data should be kept to recover the table in the rare event that this is needed. Data Pipeline then converts JSON to CSV so that Amazon SageMaker can read the data.Note: You should select only meaningful attributes when you convert CSV. For example, if you judge that the “campaign” attribute is not correlated, you can eliminate this attribute from the CSV.
Train the Amazon SageMaker model with the new data source.
When a new customer comes to your site, you can judge how likely it is for this customer to subscribe to your new product based on “predictedScores” provided by Amazon SageMaker.
If the new user subscribes your new product, your application must update the attribute “y” to the value 1 (for yes). This updated data is provided for the next model renewal as a new data source. It serves to improve the accuracy of your prediction. With each new entry, your application can become smarter and deliver better predictions.
Running ad hoc queries using Amazon Athena
Amazon Athena is a serverless query service that makes it easy to analyze large amounts of data stored in Amazon S3 using standard SQL. Athena is useful for examining data and collecting statistics or informative summaries about data. You can also use the powerful analytic functions of Presto, as described in the topic Aggregate Functions of Presto in the Presto documentation.
With the Data Pipeline scheduled activity, recent CSV data is always located in S3 so that you can run ad hoc queries against the data using Amazon Athena. I show this with example SQL statements following. For an in-depth description of this process, see the post Interactive SQL Queries for Data in Amazon S3 on the AWS News Blog.
Creating an Amazon Athena table and running it
Simply, you can create an EXTERNAL table for the CSV data on S3 in Amazon Athena Management Console.
=== Table Creation ===
CREATE EXTERNAL TABLE datasource (
age int,
job string,
marital string ,
education string,
default string,
housing string,
loan string,
contact string,
month string,
day_of_week string,
duration int,
campaign int,
pdays int ,
previous int ,
poutcome string,
emp_var_rate double,
cons_price_idx double,
cons_conf_idx double,
euribor3m double,
nr_employed double,
y int
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n'
LOCATION 's3://<your bucket name>/<datasource path>/';
The following query calculates the correlation coefficient between the target attribute and other attributes using Amazon Athena.
=== Sample Query ===
SELECT corr(age,y) AS correlation_age_and_target,
corr(duration,y) AS correlation_duration_and_target,
corr(campaign,y) AS correlation_campaign_and_target,
corr(contact,y) AS correlation_contact_and_target
FROM ( SELECT age , duration , campaign , y ,
CASE WHEN contact = 'telephone' THEN 1 ELSE 0 END AS contact
FROM datasource
) datasource ;
Conclusion
In this post, I introduce an example of how to analyze data in DynamoDB by using table data in Amazon S3 to optimize DynamoDB table read capacity. You can then use the analyzed data as a new data source to train an Amazon SageMaker model for accurate real-time prediction. In addition, you can run ad hoc queries against the data on S3 using Amazon Athena. I also present how to automate these procedures by using Data Pipeline.
You can adapt this example to your specific use case at hand, and hopefully this post helps you accelerate your development. You can find more examples and use cases for Amazon SageMaker in the video AWS 2017: Introducing Amazon SageMaker on the AWS website.
Yong Seong Lee is a Cloud Support Engineer for AWS Big Data Services. He is interested in every technology related to data/databases and helping customers who have difficulties in using AWS services. His motto is “Enjoy life, be curious and have maximum experience.”
With the explosion in virtual reality (VR) technologies over the past few years, we’ve had an increasing number of customers ask us for advice and best practices around deploying their VR-based products and service offerings on the AWS Cloud. It soon became apparent that while the VR ecosystem is large in both scope and depth of types of workloads (gaming, e-medicine, security analytics, live streaming events, etc.), many of the workloads followed repeatable patterns, with storage and delivery of live and on-demand immersive video at the top of the list.
Looking at consumer trends, the desire for live and on-demand immersive video is fairly self-explanatory. VR has ushered in convenient and low-cost access for consumers and businesses to a wide variety of options for consuming content, ranging from browser playback of live and on-demand 360º video, all the way up to positional tracking systems with a high degree of immersion. All of these scenarios contain one lowest common denominator: video.
Which brings us to the topic of this post. We set out to build a solution that could support both live and on-demand events, bring with it a high degree of scalability, be flexible enough to support transformation of video if required, run at a low cost, and use open-source software to every extent possible.
In this post, we describe the reference architecture we created to solve this challenge, using Amazon EC2 Spot Instances, Amazon S3, Elastic Load Balancing, Amazon CloudFront, AWS CloudFormation, and Amazon CloudWatch, with open-source software such as NGINX, FFMPEG, and JavaScript-based client-side playback technologies. We step you through deployment of the solution and how the components work, as well as the capture, processing, and playback of the underlying live and on-demand immersive media streams.
This GitHub repository includes the source code necessary to follow along. We’ve also provided a self-paced workshop, from AWS re:Invent 2017 that breaks down this architecture even further. If you experience any issues or would like to suggest an enhancement, please use the GitHub issue tracker.
Prerequisites
As a side note, you’ll also need a few additional components to take best advantage of the infrastructure:
A camera/capture device capable of encoding and streaming RTMP video
A browser to consume the content.
You’re going to generate HTML5-compatible video (Apple HLS to be exact), but there are many other native iOS and Android options for consuming the media that you create. It’s also worth noting that your playback device should support projection of your input stream. We’ll talk more about that in the next section.
How does immersive media work?
At its core, any flavor of media, be that audio or video, can be viewed with some level of immersion. The ability to interact passively or actively with the content brings with it a further level of immersion. When you look at VR devices with rotational and positional tracking, you naturally need more than an ability to interact with a flat plane of video. The challenge for any creative thus becomes a tradeoff between immersion features (degrees of freedom, monoscopic 2D or stereoscopic 3D, resolution, framerate) and overall complexity.
Where can you start from a simple and effective point of view, that enables you to build out a fairly modular solution and test it? There are a few areas we chose to be prescriptive with our solution.
Source capture from the Ricoh Theta S
First, monoscopic 360-degree video is currently one of the most commonly consumed formats on consumer devices. We explicitly chose to focus on this format, although the infrastructure is not limited to it. More on this later.
Second, if you look at most consumer-level cameras that provide live streaming ability, and even many professional rigs, there are at least two lenses or cameras at a minimum. The figure above illustrates a single capture from a Ricoh Theta S in monoscopic 2D. The left image captures 180 degrees of the field of view, and the right image captures the other 180 degrees.
For this post, we chose a typical midlevel camera (the Ricoh Theta S), and used a laptop with open-source software (Open Broadcaster Software) to encode and stream the content. Again, the solution infrastructure is not limited to this particular brand of camera. Any camera or encoder that outputs 360º video and encodes to H264+AAC with an RTMP transport will work.
Third, capturing and streaming multiple camera feeds brings additional requirements around stream synchronization and cost of infrastructure. There is also a requirement to stitch media in real time, which can be CPU and GPU-intensive. Many devices and platforms do this either on the device, or via outboard processing that sits close to the camera location. If you stitch and deliver a single stream, you can save the costs of infrastructure and bitrate/connectivity requirements. We chose to keep these aspects on the encoder side to save on cost and reduce infrastructure complexity.
Last, the most common delivery format that requires little to no processing on the infrastructure side is equirectangular projection, as per the above figure. By stitching and unwrapping the spherical coordinates into a flat plane, you can easily deliver the video exactly as you would with any other live or on-demand stream. The only caveat is that resolution and bit rate are of utmost importance. The higher you can push these (high bit rate @ 4K resolution), the more immersive the experience is for viewers. This is due to the increase in sharpness and reduction of compression artifacts.
Knowing that we would be transcoding potentially at 4K on the source camera, but in a format that could be transmuxed without an encoding penalty on the origin servers, we implemented a pass-through for the highest bit rate, and elected to only transcode lower bitrates. This requires some level of configuration on the source encoder, but saves on cost and infrastructure. Because you can conform the source stream, you may as well take advantage of that!
For this post, we chose not to focus on ways to optimize projection. However, the reference architecture does support this with additional open source components compiled into the FFMPEG toolchain. A number of options are available to this end, such as open source equirectangular to cubic transformation filters. There is a tradeoff, however, in that reprojection implies that all streams must be transcoded.
Processing and origination stack
To get started, we’ve provided a CloudFormation template that you can launch directly into your own AWS account. We quickly review how it works, the solution’s components, key features, processing steps, and examine the main configuration files. Following this, you launch the stack, and then proceed with camera and encoder setup.
Immersive streaming reference architecture
The event encoder publishes the RTMP source to multiple origin elastic IP addresses for packaging into the HLS adaptive bitrate.
The client requests the live stream through the CloudFront CDN.
The origin responds with the appropriate HLS stream.
The edge fleet caches media requests from clients and elastically scales across both Availability Zones to meet peak demand.
CloudFront caches media at local edge PoPs to improve performance for users and reduce the origin load.
When the live event is finished, the VOD asset is published to S3. An S3 event is then published to SQS.
The encoding fleet processes the read messages from the SQS queue, processes the VOD clips, and stores them in the S3 bucket.
How it works
A camera captures content, and with the help of a contribution encoder, publishes a live stream in equirectangular format. The stream is encoded at a high bit rate (at least 2.5 Mbps, but typically 16+ Mbps for 4K) using H264 video and AAC audio compression codecs, and delivered to a primary origin via the RTMP protocol. Streams may transit over the internet or dedicated links to the origins. Typically, for live events in the field, internet or bonded cellular are the most widely used.
The encoder is typically configured to push the live stream to a primary URI, with the ability (depending on the source encoding software/hardware) to roll over to a backup publishing point origin if the primary fails. Because you run across multiple Availability Zones, this architecture could handle an entire zone outage with minor disruption to live events. The primary and backup origins handle the ingestion of the live stream as well as transcoding to H264+AAC-based adaptive bit rate sets. After transcode, they package the streams into HLS for delivery and create a master-level manifest that references all adaptive bit rates.
The edge cache fleet pulls segments and manifests from the active origin on demand, and supports failover from primary to backup if the primary origin fails. By adding this caching tier, you effectively separate the encoding backend tier from the cache tier that responds to client or CDN requests. In addition to origin protection, this separation allows you to independently monitor, configure, and scale these components.
Viewers can use the sample HTML5 player (or compatible desktop, iOS or Android application) to view the streams. Navigation in the 360-degree view is handled either natively via device-based gyroscope, positionally via more advanced devices such as a head mount display, or via mouse drag on the desktop. Adaptive bit rate is key here, as this allows you to target multiple device types, giving the player on each device the option of selecting an optimum stream based on network conditions or device profile.
Solution components
When you deploy the CloudFormation template, all the architecture services referenced above are created and launched. This includes:
The compute tier running on Spot Instances for the corresponding components:
the primary and backup ingest origins
the edge cache fleet
the transcoding fleet
the test source
The CloudFront distribution
S3 buckets for storage of on-demand VOD assets
An Application Load Balancer for load balancing the service
An Amazon ECS cluster and container for the test source
The template also provisions the underlying dependencies:
A VPC
Security groups
IAM policies and roles
Elastic network interfaces
Elastic IP addresses
The edge cache fleet instances need some way to discover the primary and backup origin locations. You use elastic network interfaces and elastic IP addresses for this purpose.
As each component of the infrastructure is provisioned, software required to transcode and process the streams across the Spot Instances is automatically deployed. This includes NGiNX-RTMP for ingest of live streams, FFMPEG for transcoding, NGINX for serving, and helper scripts to handle various tasks (potential Spot Instance interruptions, queueing, moving content to S3). Metrics and logs are available through CloudWatch and you can manage the deployment using the CloudFormation console or AWS CLI.
Key features include:
Live and video-on-demand recording
You’re supporting both live and on-demand. On-demand content is created automatically when the encoder stops publishing to the origin.
Cost-optimization and operating at scale using Spot Instances
Spot Instances are used exclusively for infrastructure to optimize cost and scale throughput.
Midtier caching
To protect the origin servers, the midtier cache fleet pulls, caches, and delivers to downstream CDNs.
Distribution via CloudFront or multi-CDN
The Application Load Balancer endpoint allows CloudFront or any third-party CDN to source content from the edge fleet and, indirectly, the origin.
FFMPEG + NGINX + NGiNX-RTMP
These three components form the core of the stream ingest, transcode, packaging, and delivery infrastructure, as well as the VOD-processing component for creating transcoded VOD content on-demand.
Simple deployment using a CloudFormation template
All infrastructure can be easily created and modified using CloudFormation.
Prototype player page
To provide an end-to-end experience right away, we’ve included a test player page hosted as a static site on S3. This page uses A-Frame, a cross-platform, open-source framework for building VR experiences in the browser. Though A-Frame provides many features, it’s used here to render a sphere that acts as a 3D canvas for your live stream.
Spot Instance considerations
At this stage, and before we discuss processing, it is important to understand how the architecture operates with Spot Instances.
Spot Instances are spare compute capacity in the AWS Cloud available to you at steep discounts compared to On-Demand prices. Spot Instances enables you to optimize your costs on the AWS Cloud and scale your application’s throughput up to 10X for the same budget. By selecting Spot Instances, you can save up-to 90% on On-Demand prices. This allows you to greatly reduce the cost of running the solution because, outside of S3 for storage and CloudFront for delivery, this solution is almost entirely dependent on Spot Instances for infrastructure requirements.
We also know that customers running events look to deploy streaming infrastructure at the lowest price point, so it makes sense to take advantage of it wherever possible. A potential challenge when using Spot Instances for live streaming and on-demand processing is that you need to proactively deal with potential Spot Instance interruptions. How can you best deal with this?
First, the origin is deployed in a primary/backup deployment. If a Spot Instance interruption happens on the primary origin, you can fail over to the backup with a brief interruption. Should a potential interruption not be acceptable, then either Reserved Instances or On-Demand options (or a combination) can be used at this tier.
Second, the edge cache fleet runs a job (started automatically at system boot) that periodically queries the local instance metadata to detect if an interruption is scheduled to occur. Spot Instance Interruption Notices provide a two-minute warning of a pending interruption. If you poll every 5 seconds, you have almost 2 full minutes to detach from the Load Balancer and drain or stop any traffic directed to your instance.
Lastly, use an SQS queue when transcoding. If a transcode for a Spot Instance is interrupted, the stale item falls back into the SQS queue and is eventually re-surfaced into the processing pipeline. Only remove items from the queue after the transcoded files have been successfully moved to the destination S3 bucket.
Processing
As discussed in the previous sections, you pass through the video for the highest bit rate to save on having to increase the instance size to transcode the 4K or similar high bit rate or resolution content.
We’ve selected a handful of bitrates for the adaptive bit rate stack. You can customize any of these to suit the requirements for your event. The default ABR stack includes:
2160p (4K)
1080p
540p
480p
These can be modified by editing the /etc/nginx/rtmp.d/rtmp.conf NGINX configuration file on the origin or the CloudFormation template.
It’s important to understand where and how streams are transcoded. When the source high bit rate stream enters the primary or backup origin at the /live RTMP application entry point, it is recorded on stop and start of publishing. On completion, it is moved to S3 by a cleanup script, and a message is placed in your SQS queue for workers to use. These workers transcode the media and push it to a playout location bucket.
This solution uses Spot Fleet with automatic scaling to drive the fleet size. You can customize it based on CloudWatch metrics, such as simple utilization metrics to drive the size of the fleet. Why use Spot Instances for the transcode option instead of Amazon Elastic Transcoder? This allows you to implement reprojection of the input stream via FFMPEG filters in the future.
The origins handle all the heavy live streaming work. Edges only store and forward the segments and manifests, and provide scaling plus reduction of burden on the origin. This lets you customize the origin to the right compute capacity without having to rely on a ‘high watermark’ for compute sizing, thus saving additional costs.
Loopback is an important concept for the live origins. The incoming stream entering /live is transcoded by FFMPEG to multiple bit rates, which are streamed back to the same host via RTMP, on a secondary publishing point /show. The secondary publishing point is transparent to the user and encoder, but handles HLS segment generation and cleanup, and keeps a sliding window of live segments and constantly updating manifests.
Configuration
Our solution provides two key points of configuration that can be used to customize the solution to accommodate ingest, recording, transcoding, and delivery, all controlled via origin and edge configuration files, which are described later. In addition, a number of job scripts run on the instances to provide hooks into Spot Instance interruption events and the VOD SQS-based processing queue.
Origin instances
The rtmp.conf excerpt below also shows additional parameters that can be customized, such as maximum recording file size in Kbytes, HLS Fragment length, and Playlist sizes. We’ve created these in accordance with general industry best practices to ensure the reliable streaming and delivery of your content.
rtmp {
server {
listen 1935;
chunk_size 4000;
application live {
live on;
record all;
record_path /var/lib/nginx/rec;
record_max_size 128000K;
exec_record_done /usr/local/bin/record-postprocess.sh $path $basename;
exec /usr/local/bin/ffmpeg <…parameters…>;
}
application show {
live on;
hls on;
...
hls_type live;
hls_fragment 10s;
hls_playlist_length 60s;
...
}
}
}
This exposes a few URL endpoints for debugging and general status. In production, you would most likely turn these off:
/stat provides a statistics endpoint accessible via any standard web browser.
/control enables control of RTMP streams and publishing points.
You also control the TTLs, as previously discussed. It’s important to note here that you are setting TTLs explicitly at the origin, instead of in CloudFront’s distribution configuration. While both are valid, this approach allows you to reconfigure and restart the service on the fly without having to push changes through CloudFront. This is useful for debugging any caching or playback issues.
record-postprocess.sh – Ensures that recorded files on the origin are well-formed, and transfers them to S3 for processing.
ffmpeg.sh – Transcodes content on the encoding fleet, pulling source media from your S3 ingress bucket, based on SQS queue entries, and pushing transcoded adaptive bit rate segments and manifests to your VOD playout egress bucket.
For more details, see the Delivery and Playback section later in this post.
Camera source
With the processing and origination infrastructure running, you need to configure your camera and encoder.
As discussed, we chose to use a Ricoh Theta S camera and Open Broadcaster Software (OBS) to stitch and deliver a stream into the infrastructure. Ricoh provides a free ‘blender’ driver, which allows you to transform, stitch, encode, and deliver both transformed equirectangular (used for this post) video as well as spherical (two camera) video. The Theta provides an easy way to get capturing for under $300, and OBS is a free and open-source software application for capturing and live streaming on a budget. It is quick, cheap, and enjoys wide use by the gaming community. OBS lowers the barrier to getting started with immersive streaming.
While the resolution and bit rate of the Theta may not be 4K, it still provides us with a way to test the functionality of the entire pipeline end to end, without having to invest in a more expensive camera rig. One could also use this type of model to target smaller events, which may involve mobile devices with smaller display profiles, such as phones and potentially smaller sized tablets.
Looking for a more professional solution? Nokia, GoPro, Samsung, and many others have options ranging from $500 to $50,000. This solution is based around the Theta S capabilities, but we’d encourage you to extend it to meet your specific needs.
If your device can support equirectangular RTMP, then it can deliver media through the reference architecture (dependent on instance sizing for higher bit rate sources, of course). If additional features are required such as camera stitching, mixing, or device bonding, we’d recommend exploring a commercial solution such as Teradek Sphere.
Teradek Rig (Teradek)
Ricoh Theta (CNET)
All cameras have varied PC connectivity support. We chose the Ricoh Theta S due to the real-time video connectivity that it provides through software drivers on macOS and PC. If you plan to purchase a camera to use with a PC, confirm that it supports real-time capabilities as a peripheral device.
Encoding and publishing
Now that you have a camera, encoder, and AWS stack running, you can finally publish a live stream.
To start streaming with OBS, configure the source camera and set a publishing point. Use the RTMP application name /live on port 1935 to ingest into the primary origin’s Elastic IP address provided as the CloudFormation output: primaryOriginElasticIp.
You also need to choose a stream name or stream key in OBS. You can use any stream name, but keep the naming short and lowercase, and use only alphanumeric characters. This avoids any parsing issues on client-side player frameworks. There’s no publish point protection in your deployment, so any stream key works with the default NGiNX-RTMP configuration. For more information about stream keys, publishing point security, and extending the NGiNX-RTMP module, see the NGiNX-RTMP Wiki.
You should end up with a configuration similar to the following:
OBS Stream Settings
The Output settings dialog allows us to rescale the Video canvas and encode it for delivery to our AWS infrastructure. In the dialog below, we’ve set the Theta to encode at 5 Mbps in CBR mode using a preset optimized for low CPU utilization. We chose these settings in accordance with best practices for the stream pass-through at the origin for the initial incoming bit rate. You may notice that they largely match the FFMPEG encoding settings we use on the origin – namely constant bit rate, a single audio track, and x264 encoding with the ‘veryfast’ encoding profile.
OBS Output Settings
Live to On-Demand
As you may have noticed, an on-demand component is included in the solution architecture. When talking to customers, one frequent request that we see is that they would like to record the incoming stream with as little effort as possible.
NGINX-RTMP’s recording directives provide an easy way to accomplish this. We record any newly published stream on stream start at the primary or backup origins, using the incoming source stream, which also happens to be the highest bit rate. When the encoder stops broadcasting, NGINX-RTMP executes an exec_record_done script – record-postprocess.sh (described in the Configuration section earlier), which ensures that the content is well-formed, and then moves it to an S3 ingest bucket for processing.
Transcoding of content to make it ready for VOD as adaptive bit rate is a multi-step pipeline. First, Spot Instances in the transcoding cluster periodically poll the SQS queue for new jobs. Items on the queue are pulled off on demand by processing instances, and transcoded via FFMPEG into adaptive bit rate HLS. This allows you to also extend FFMPEG using filters for cubic and other bitrate-optimizing 360-specific transforms. Finally, transcoded content is moved from the ingest bucket to an egress bucket, making them ready for playback via your CloudFront distribution.
Separate ingest and egress by bucket to provide hard security boundaries between source recordings (which are highest quality and unencrypted), and destination derivatives (which may be lower quality and potentially require encryption). Bucket separation also allows you to order and archive input and output content using different taxonomies, which is common when moving content from an asset management and archival pipeline (the ingest bucket) to a consumer-facing playback pipeline (the egress bucket, and any other attached infrastructure or services, such as CMS, Mobile applications, and so forth).
Because streams are pushed over the internet, there is always the chance that an interruption could occur in the network path, or even at the origin side of the equation (primary to backup roll-over). Both of these scenarios could result in malformed or partial recordings being created. For the best level of reliability, encoding should always be recorded locally on-site as a precaution to deal with potential stream interruptions.
Delivery and playback
With the camera turned on and OBS streaming to AWS, the final step is to play the live stream. We’ve primarily tested the prototype player on the latest Chrome and Firefox browsers on macOS, so your mileage may vary on different browsers or operating systems. For those looking to try the livestream on Google Cardboard, or similar headsets, native apps for iOS (VRPlayer) and Android exist that can play back HLS streams.
The prototype player is hosted in an S3 bucket and can be found from the CloudFormation output clientWebsiteUrl. It requires a stream URL provided as a query parameter ?url=<stream_url> to begin playback. This stream URL is determined by the RTMP stream configuration in OBS. For example, if OBS is publishing to rtmp://x.x.x.x:1935/live/foo, the resulting playback URL would be:
https://<cloudFrontDistribution>/hls/foo.m3u8
The combined player URL and playback URL results in a path like this one:
To assist in setup/debugging, we’ve provided a test source as part of the CloudFormation template. A color bar pattern with timecode and audio is being generated by FFmpeg running as an ECS task. Much like OBS, FFmpeg is streaming the test pattern to the primary origin over the RTMP protocol. The prototype player and test HLS stream can be accessed by opening the clientTestPatternUrl CloudFormation output link.
Test Stream Playback
What’s next?
In this post, we walked you through the design and implementation of a full end-to-end immersive streaming solution architecture. As you may have noticed, there are a number of areas this could expand into, and we intend to do this in follow-up posts around the topic of virtual reality media workloads in the cloud. We’ve identified a number of topics such as load testing, content protection, client-side metrics and analytics, and CI/CD infrastructure for 24/7 live streams. If you have any requests, please drop us a line.
We would like to extend extra-special thanks to Scott Malkie and Chad Neal for their help and contributions to this post and reference architecture.
I know from first hand experience the FBI is corrupt. In 2007, they threatened me, trying to get me to cancel a talk that revealed security vulnerabilities in a large corporation’s product. Such abuses occur because there is no transparency and oversight. FBI agents write down our conversation in their little notebooks instead of recording it, so that they can control the narrative of what happened, presenting their version of the converstion (leaving out the threats). In this day and age of recording devices, this is indefensible.
She writes “I know firsthand that it’s difficult to get a FISA warrant“. Yes, the process was difficult for her, an underling, to get a FISA warrant. The process is different when a leader tries to do the same thing.
I know this first hand having casually worked as an outsider with intelligence agencies. I saw two processes in place: one for the flunkies, and one for those above the system. The flunkies constantly complained about how there is too many process in place oppressing them, preventing them from getting their jobs done. The leaders understood the system and how to sidestep those processes.
That’s not to say the Nunes Memo has merit, but it does point out that privacy advocates have a point in wanting more oversight and transparency in such surveillance of American citizens.
Blaming us privacy advocates isn’t the way to go. It’s not going to succeed in tarnishing us, but will push us more into Trump’s camp, causing us to reiterate that we believe the FBI and FISA are corrupt.
We are excited to announce AWS Glue support for running ETL (extract, transform, and load) scripts in Scala. Scala lovers can rejoice because they now have one more powerful tool in their arsenal. Scala is the native language for Apache Spark, the underlying engine that AWS Glue offers for performing data transformations.
Beyond its elegant language features, writing Scala scripts for AWS Glue has two main advantages over writing scripts in Python. First, Scala is faster for custom transformations that do a lot of heavy lifting because there is no need to shovel data between Python and Apache Spark’s Scala runtime (that is, the Java virtual machine, or JVM). You can build your own transformations or invoke functions in third-party libraries. Second, it’s simpler to call functions in external Java class libraries from Scala because Scala is designed to be Java-compatible. It compiles to the same bytecode, and its data structures don’t need to be converted.
To illustrate these benefits, we walk through an example that analyzes a recent sample of the GitHub public timeline available from the GitHub archive. This site is an archive of public requests to the GitHub service, recording more than 35 event types ranging from commits and forks to issues and comments.
This post shows how to build an example Scala script that identifies highly negative issues in the timeline. It pulls out issue events in the timeline sample, analyzes their titles using the sentiment prediction functions from the Stanford CoreNLP libraries, and surfaces the most negative issues.
Getting started
Before we start writing scripts, we use AWS Glue crawlers to get a sense of the data—its structure and characteristics. We also set up a development endpoint and attach an Apache Zeppelin notebook, so we can interactively explore the data and author the script.
Crawl the data
The dataset used in this example was downloaded from the GitHub archive website into our sample dataset bucket in Amazon S3, and copied to the following locations:
Choose the best folder by replacing <region> with the region that you’re working in, for example, us-east-1. Crawl this folder, and put the results into a database named githubarchivein the AWS Glue Data Catalog, as described in the AWS Glue Developer Guide. This folder contains 12 hours of the timeline from January 22, 2017, and is organized hierarchically (that is, partitioned) by year, month, and day.
When finished, use the AWS Glue console to navigate to the table named data in the githubarchive database. Notice that this data has eight top-level columns, which are common to each event type, and three partition columns that correspond to year, month, and day.
Choose the payload column, and you will notice that it has a complex schema—one that reflects the union of the payloads of event types that appear in the crawled data. Also note that the schema that crawlers generate is a subset of the true schema because they sample only a subset of the data.
Set up the library, development endpoint, and notebook
Next, you need to download and set up the libraries that estimate the sentiment in a snippet of text. The Stanford CoreNLP libraries contain a number of human language processing tools, including sentiment prediction.
Download the Stanford CoreNLP libraries. Unzip the .zip file, and you’ll see a directory full of jar files. For this example, the following jars are required:
stanford-corenlp-3.8.0.jar
stanford-corenlp-3.8.0-models.jar
ejml-0.23.jar
Upload these files to an Amazon S3 path that is accessible to AWS Glue so that it can load these libraries when needed. For this example, they are in s3://glue-sample-other/corenlp/.
Development endpoints are static Spark-based environments that can serve as the backend for data exploration. You can attach notebooks to these endpoints to interactively send commands and explore and analyze your data. These endpoints have the same configuration as that of AWS Glue’s job execution system. So, commands and scripts that work there also work the same when registered and run as jobs in AWS Glue.
To set up an endpoint and a Zeppelin notebook to work with that endpoint, follow the instructions in the AWS Glue Developer Guide. When you are creating an endpoint, be sure to specify the locations of the previously mentioned jars in the Dependent jars path as a comma-separated list. Otherwise, the libraries will not be loaded.
After you set up the notebook server, go to the Zeppelin notebook by choosing Dev Endpoints in the left navigation pane on the AWS Glue console. Choose the endpoint that you created. Next, choose the Notebook Server URL, which takes you to the Zeppelin server. Log in using the notebook user name and password that you specified when creating the notebook. Finally, create a new note to try out this example.
Each notebook is a collection of paragraphs, and each paragraph contains a sequence of commands and the output for that command. Moreover, each notebook includes a number of interpreters. If you set up the Zeppelin server using the console, the (Python-based) pyspark and (Scala-based) spark interpreters are already connected to your new development endpoint, with pyspark as the default. Therefore, throughout this example, you need to prepend %spark at the top of your paragraphs. In this example, we omit these for brevity.
Working with the data
In this section, we use AWS Glue extensions to Spark to work with the dataset. We look at the actual schema of the data and filter out the interesting event types for our analysis.
Start with some boilerplate code to import libraries that you need:
Then, create the Spark and AWS Glue contexts needed for working with the data:
@transient val spark: SparkContext = SparkContext.getOrCreate()
val glueContext: GlueContext = new GlueContext(spark)
You need the transient decorator on the SparkContext when working in Zeppelin; otherwise, you will run into a serialization error when executing commands.
Dynamic frames
This section shows how to create a dynamic frame that contains the GitHub records in the table that you crawled earlier. A dynamic frame is the basic data structure in AWS Glue scripts. It is like an Apache Spark data frame, except that it is designed and optimized for data cleaning and transformation workloads. A dynamic frame is well-suited for representing semi-structured datasets like the GitHub timeline.
A dynamic frame is a collection of dynamic records. In Spark lingo, it is an RDD (resilient distributed dataset) of DynamicRecords. A dynamic record is a self-describing record. Each record encodes its columns and types, so every record can have a schema that is unique from all others in the dynamic frame. This is convenient and often more efficient for datasets like the GitHub timeline, where payloads can vary drastically from one event type to another.
The following creates a dynamic frame, github_events, from your table:
The getCatalogSource() method returns a DataSource, which represents a particular table in the Data Catalog. The getDynamicFrame() method returns a dynamic frame from the source.
Recall that the crawler created a schema from only a sample of the data. You can scan the entire dataset, count the rows, and print the complete schema as follows:
github_events.count
github_events.printSchema()
The result looks like the following:
The data has 414,826 records. As before, notice that there are eight top-level columns, and three partition columns. If you scroll down, you’ll also notice that the payload is the most complex column.
Run functions and filter records
This section describes how you can create your own functions and invoke them seamlessly to filter records. Unlike filtering with Python lambdas, Scala scripts do not need to convert records from one language representation to another, thereby reducing overhead and running much faster.
Let’s create a function that picks only the IssuesEvents from the GitHub timeline. These events are generated whenever someone posts an issue for a particular repository. Each GitHub event record has a field, “type”, that indicates the kind of event it is. The issueFilter() function returns true for records that are IssuesEvents.
Note that the getField() method returns an Option[Any] type, so you first need to check that it exists before checking the type.
You pass this function to the filter transformation, which applies the function on each record and returns a dynamic frame of those records that pass.
val issue_events = github_events.filter(issueFilter)
Now, let’s look at the size and schema of issue_events.
issue_events.count
issue_events.printSchema()
It’s much smaller (14,063 records), and the payload schema is less complex, reflecting only the schema for issues. Keep a few essential columns for your analysis, and drop the rest using the ApplyMapping() transform:
The ApplyMapping() transform is quite handy for renaming columns, casting types, and restructuring records. The preceding code snippet tells the transform to select the fields (or columns) that are enumerated in the left half of the tuples and map them to the fields and types in the right half.
Estimating sentiment using Stanford CoreNLP
To focus on the most pressing issues, you might want to isolate the records with the most negative sentiments. The Stanford CoreNLP libraries are Java-based and offer sentiment-prediction functions. Accessing these functions through Python is possible, but quite cumbersome. It requires creating Python surrogate classes and objects for those found on the Java side. Instead, with Scala support, you can use those classes and objects directly and invoke their methods. Let’s see how.
First, import the libraries needed for the analysis:
The Stanford CoreNLP libraries have a main driver that orchestrates all of their analysis. The driver setup is heavyweight, setting up threads and data structures that are shared across analyses. Apache Spark runs on a cluster with a main driver process and a collection of backend executor processes that do most of the heavy sifting of the data.
The Stanford CoreNLP shared objects are not serializable, so they cannot be distributed easily across a cluster. Instead, you need to initialize them once for every backend executor process that might need them. Here is how to accomplish that:
val props = new Properties()
props.setProperty("annotators", "tokenize, ssplit, parse, sentiment")
props.setProperty("parse.maxlen", "70")
object myNLP {
lazy val coreNLP = new StanfordCoreNLP(props)
}
The properties tell the libraries which annotators to execute and how many words to process. The preceding code creates an object, myNLP, with a field coreNLP that is lazily evaluated. This field is initialized only when it is needed, and only once. So, when the backend executors start processing the records, each executor initializes the driver for the Stanford CoreNLP libraries only one time.
Next is a function that estimates the sentiment of a text string. It first calls Stanford CoreNLP to annotate the text. Then, it pulls out the sentences and takes the average sentiment across all the sentences. The sentiment is a double, from 0.0 as the most negative to 4.0 as the most positive.
Now, let’s estimate the sentiment of the issue titles and add that computed field as part of the records. You can accomplish this with the map() method on dynamic frames:
val issue_sentiments = issue_titles.map((rec: DynamicRecord) => {
val mbody = rec.getField("title")
mbody match {
case Some(mval: String) => {
rec.addField("sentiment", ScalarNode(estimatedSentiment(mval)))
rec }
case _ => rec
}
})
The map() method applies the user-provided function on every record. The function takes a DynamicRecord as an argument and returns a DynamicRecord. The code above computes the sentiment, adds it in a top-level field, sentiment, to the record, and returns the record.
Count the records with sentiment and show the schema. This takes a few minutes because Spark must initialize the library and run the sentiment analysis, which can be involved.
Notice that all records were processed (14,063), and the sentiment value was added to the schema.
Finally, let’s pick out the titles that have the lowest sentiment (less than 1.5). Count them and print out a sample to see what some of the titles look like.
val pressing_issues = issue_sentiments.filter(_.getField("sentiment").exists(_.asInstanceOf[Double] < 1.5))
pressing_issues.count
pressing_issues.show(10)
Next, write them all to a file so that you can handle them later. (You’ll need to replace the output path with your own.)
Take a look in the output path, and you can see the output files.
Putting it all together
Now, let’s create a job from the preceding interactive session. The following script combines all the commands from earlier. It processes the GitHub archive files and writes out the highly negative issues:
import com.amazonaws.services.glue.DynamicRecord
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.types._
import org.apache.spark.SparkContext
import java.util.Properties
import edu.stanford.nlp.ling.CoreAnnotations
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations
import scala.collection.convert.wrapAll._
object GlueApp {
object myNLP {
val props = new Properties()
props.setProperty("annotators", "tokenize, ssplit, parse, sentiment")
props.setProperty("parse.maxlen", "70")
lazy val coreNLP = new StanfordCoreNLP(props)
}
def estimatedSentiment(text: String): Double = {
if ((text == null) || (!text.nonEmpty)) { return Double.NaN }
val annotations = myNLP.coreNLP.process(text)
val sentences = annotations.get(classOf[CoreAnnotations.SentencesAnnotation])
sentences.foldLeft(0.0)( (csum, x) => {
csum + RNNCoreAnnotations.getPredictedClass(x.get(classOf[SentimentCoreAnnotations.SentimentAnnotatedTree]))
}) / sentences.length
}
def main(sysArgs: Array[String]) {
val spark: SparkContext = SparkContext.getOrCreate()
val glueContext: GlueContext = new GlueContext(spark)
val dbname = "githubarchive"
val tblname = "data"
val outpath = "s3://<bucket>/out/path/"
val github_events = glueContext
.getCatalogSource(database = dbname, tableName = tblname)
.getDynamicFrame()
val issue_events = github_events.filter((rec: DynamicRecord) => {
rec.getField("type").exists(_ == "IssuesEvent")
})
val issue_titles = issue_events.applyMapping(Seq(("id", "string", "id", "string"),
("actor.login", "string", "actor", "string"),
("repo.name", "string", "repo", "string"),
("payload.action", "string", "action", "string"),
("payload.issue.title", "string", "title", "string")))
val issue_sentiments = issue_titles.map((rec: DynamicRecord) => {
val mbody = rec.getField("title")
mbody match {
case Some(mval: String) => {
rec.addField("sentiment", ScalarNode(estimatedSentiment(mval)))
rec }
case _ => rec
}
})
val pressing_issues = issue_sentiments.filter(_.getField("sentiment").exists(_.asInstanceOf[Double] < 1.5))
glueContext.getSinkWithFormat(connectionType = "s3",
options = JsonOptions(s"""{"path": "$outpath"}"""),
format = "json")
.writeDynamicFrame(pressing_issues)
}
}
Notice that the script is enclosed in a top-level object called GlueApp, which serves as the script’s entry point for the job. (You’ll need to replace the output path with your own.) Upload the script to an Amazon S3 location so that AWS Glue can load it when needed.
To create the job, open the AWS Glue console. Choose Jobs in the left navigation pane, and then choose Add job. Create a name for the job, and specify a role with permissions to access the data. Choose An existing script that you provide, and choose Scala as the language.
For the Scala class name, type GlueApp to indicate the script’s entry point. Specify the Amazon S3 location of the script.
Choose Script libraries and job parameters. In the Dependent jars path field, enter the Amazon S3 locations of the Stanford CoreNLP libraries from earlier as a comma-separated list (without spaces). Then choose Next.
No connections are needed for this job, so choose Next again. Review the job properties, and choose Finish. Finally, choose Run job to execute the job.
You can simply edit the script’s input table and output path to run this job on whatever GitHub timeline datasets that you might have.
Conclusion
In this post, we showed how to write AWS Glue ETL scripts in Scala via notebooks and how to run them as jobs. Scala has the advantage that it is the native language for the Spark runtime. With Scala, it is easier to call Scala or Java functions and third-party libraries for analyses. Moreover, data processing is faster in Scala because there’s no need to convert records from one language runtime to another.
You can find more example of Scala scripts in our GitHub examples repository: https://github.com/awslabs/aws-glue-samples. We encourage you to experiment with Scala scripts and let us know about any interesting ETL flows that you want to share.
Mehul Shah is a senior software manager for AWS Glue. His passion is leveraging the cloud to build smarter, more efficient, and easier to use data systems. He has three girls, and, therefore, he has no spare time.
Ben Sowell is a software development engineer at AWS Glue.
Vinay Vivili is a software development engineer for AWS Glue.
Many enterprises use Microsoft Active Directory to manage users, groups, and computers in a network. And a question is asked frequently: How can Active Directory users access big data workloads running on Amazon EMR with the same single sign-on (SSO) experience they have when accessing resources in the Active Directory network?
This post walks you through the process of using AWS CloudFormation to set up a cross-realm trust and extend authentication from an Active Directory network into an Amazon EMR cluster with Kerberos enabled. By establishing a cross-realm trust, Active Directory users can use their Active Directory credentials to access an Amazon EMR cluster and run jobs as themselves.
Walkthrough overview
In this example, you build a solution that allows Active Directory users to seamlessly access Amazon EMR clusters and run big data jobs. Here’s what you need before setting up this solution:
A possible limit increase for your account (Note: Usually a limit increase will not be necessary. See the AWS Service Limits documentation if you encounter a limit error while building the solution.)
To make it easier for you to get started, I created AWS CloudFormation templates that automatically configure and deploy the solution for you. The following steps and resources are involved in setting up the solution:
Note: If you want to manually create and configure the components for this solution without using AWS CloudFormation, refer to the Amazon EMR cross-realm documentation. IMPORTANT: The AWS CloudFormation templates used in this post are designed to work only in the us-east-1 (N. Virginia) Region. They are not intended for production use without modification.
Single-step solution deployment
If you don’t want to set up each component individually, you can use the single-step AWS CloudFormation template. The single-step template is a master template that uses nested stacks (additional templates) to launch and configure all the resources for the solution in one go.
To deploy the single-step template into your account, choose Launch Stack:
This takes you to the Create stack wizard in the AWS CloudFormation console. The template is launched in the US East (N. Virginia) Region by default. Do not change to a different Region because the template is designed to work only in us-east-1 (N. Virginia).
On the Select Template page, keep the default URL for the AWS CloudFormation template, and then choose Next.
On the Specify Details page, review the parameters for the template. Provide values for the parameters that require input (for more information, see the parameters table that follows).
The following parameters are available in this template.
Parameter
Default
Description
Domain Controller name
DC1
NetBIOS (hostname) name of the Active Directory server. This name can be up to 15 characters long.
Active Directory domain
example.com
Fully qualified domain name (FQDN) of the forest root domain (for example, example.com).
Domain NetBIOS name
EXAMPLE
NetBIOS name of the domain for users of earlier versions of Windows. This name can be up to 15 characters long.
Domain admin user
CrossRealmAdmin
User name for the account that is added as domain administrator. This account is separate from the default administrator account.
Domain admin password
Requires input
Password for the domain admin user. Must be at least eight characters including letters, numbers, and symbols.
Key pair name
Requires input
Name of an existing key pair, which enables you to connect securely to your instance after it launches.
Instance type
m4.xlarge
Instance type for the domain controller and the Amazon EMR cluster.
Allowed IP address
10.0.0.0/16
The client IP address can that can reach your cluster. Specify an IP address range in CIDR notation (for example, 203.0.113.5/32). By default, only the VPC CIDR (10.0.0.0/16) can reach the cluster. Be sure to add your client IP range so that you can connect to the cluster using SSH.
EMR Kerberos realm
EC2.INTERNAL
Cluster’s Kerberos realm name. By default, the realm name is derived from the cluster’s VPC domain name in uppercase letters (for example, EC2.INTERNAL is the default VPC domain name in the us-east-1 Region).
Trusted AD domain
EXAMPLE.COM
The Active Directory (AD) domain that you want to trust. This is the same as the “Active Directory domain.” However, it must use all uppercase letters (for example, EXAMPLE.COM).
Cross-realm trust password
Requires input
Password that you want to use for your cross-realm trust.
Instance count
2
The number of instances (core nodes) for the cluster.
EMR applications
Hadoop, Spark, Ganglia, Hive
Comma separated list of applications to install on the cluster.
After you specify the template details, choose Next. On the Options page, choose Next again. On the Review page, select the I acknowledge that AWS CloudFormation might create IAM resources with custom names check box, and then choose Create.
It takes approximately 45 minutes for the deployment to complete. When the stack launch is complete, it will return outputs with information about the resources that were created. Note the outputs and skip to the Managing and testing the solution section. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:
This section describes how to use AWS CloudFormation templates to perform each step separately in the solution.
Create and configure an Amazon VPC
In order for you to establish a cross-realm trust between an Amazon EMR Kerberos realm and an Active Directory domain, your Amazon VPC must meet the following requirements:
The subnet used for the Amazon EMR cluster must have a CIDR block of fewer than nine digits (for example, 10.0.1.0/24).
Both DNS resolution and DNS hostnames must be enabled (set to “yes”).
The Active Directory domain controller must be the DNS server for instances in the Amazon VPC (this is configured in the next step).
To use the AWS CloudFormation template to create and configure an Amazon VPC with the prerequisites listed previously, choose Launch Stack:
Note: If you want to create the VPC manually (without using AWS CloudFormation), see Set Up the VPC and Subnet in the Amazon EMR documentation.
Launching this stack creates the following AWS resources:
Amazon VPC with CIDR block 10.0.0.0/16 (Name: CrossRealmVPC)
Internet Gateway (Name: CrossRealmGateway)
Public subnet with CIDR block 10.0.1.0/24 (Name: CrossRealmSubnet)
Security group allowing inbound access from the VPC’s subnets (Name tag: CrossRealmSecurityGroup)
When the stack launch is complete, it should return outputs similar to the following.
Key
Value example
Description
SubnetID
subnet-xxxxxxxx
The subnet for the Active Directory domain controller and the EMR cluster.
SecurityGroup
sg-xxxxxxxx
The security group for the Active Directory domain controller.
VPCID
vpc-xxxxxxxx
The Active Directory domain controller and EMR cluster will be launched on this VPC.
Note the outputs because they are used in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:
Launch and configure an Active Directory domain controller
In this step, you use an AWS CloudFormation template to automatically launch and configure a new Active Directory domain controller and cross-realm trust.
Note: There are various ways to install and configure an Active Directory domain controller. For details on manually launching and installing a domain controller without AWS CloudFormation, see Step 2: Launch and Install the AD Domain Controller in the Amazon EMR documentation.
In addition to launching and configuring an Active Directory domain controller and cross-realm trust, this AWS CloudFormation template also sets the domain controller as the DNS server (name server) for your Amazon VPC. In other words, the template creates a new DHCP option-set for the VPC where it’s being deployed to, and it sets the private IP of the domain controller as the name server for that new DHCP option set.
IMPORTANT: You should not use this template on a production VPC with existing resources like Amazon EC2 instances. When you launch this stack, make sure that you use the new environment and resources (Amazon VPC, subnet, and security group) that were created in the Create and configure an Amazon VPC step.
To launch this stack, choose Launch Stack:
The following table contains information about the parameters available in this template. Review the parameters for the template and provide values for those that require input.
Parameter
Default
Description
VPC ID
Requires input
Launch the domain controller on this VPC (for example, use the VPC created in the Create and configure an Amazon VPC step).
Subnet ID
Requires input
Subnet used for the domain controller (for example, use the subnet created in the Create and configure an Amazon VPC step).
Security group ID
Requires input
Security group (SG) for the domain controller (for example, use the SG created in the Create and configure an Amazon VPC step).
Domain Controller name
DC1
NetBIOS name of the Active Directory server (up to 15 characters).
Active Directory domain
example.com
Fully qualified domain name (FQDN) of the forest root domain (for example, example.com).
Domain NetBIOS name
EXAMPLE
NetBIOS name of the domain for users of earlier versions of Windows. This name can be up to 15 characters long.
Domain admin user
CrossRealmAdmin
User name for the account that is added as domain administrator. This account is separate from the default administrator account.
Domain admin password
Requires input
Password for the domain admin user. Must be at least eight characters including letters, numbers, and symbols.
Key pair name
Requires input
Name of an existing EC2 key pair to enable access to the domain controller instance.
Instance type
m4.xlarge
Instance type for the domain controller.
EMR Kerberos realm
EC2.INTERNAL
Cluster’s Kerberos realm name. By default, the realm name is derived from the cluster’s VPC domain name in uppercase letters (for example, EC2.INTERNAL is the default VPC domain name in the us-east-1 Region).
Cross-realm trust password
Requires input
Password that you want to use for your cross-realm trust.
It takes 25–30 minutes for this stack to be created. When it’s complete, note the stack’s outputs, and then move to the next step: Launch an EMR cluster with Kerberos enabled.
Create a security configuration and launch an Amazon EMR cluster with Kerberos enabled
To launch a kerberized Amazon EMR cluster, you first need to create a security configuration containing the cross-realm trust configuration. You then specify cluster-specific Kerberos attributes when launching the cluster.
In this step, you use AWS CloudFormation to launch and configure a kerberized Amazon EMR cluster with a cross-realm trust. If you want to manually launch and configure a cluster with Kerberos enabled, see Step 6: Launch a Kerberized EMR Cluster in the Amazon EMR documentation.
Note: At the time of this writing, AWS CloudFormation does not yet support launching Amazon EMR clusters with Kerberos authentication enabled. To overcome this limitation, I created a template that uses an AWS Lambda-backed custom resource to launch and configure the Amazon EMR cluster with Kerberos enabled. If you use this template, there’s nothing else that you need to do. Just keep in mind that the template creates and invokes an AWS Lambda function (custom resource) to launch the cluster.
To create a cross-realm trust security configuration and launch a kerberized Amazon EMR cluster using AWS CloudFormation, choose Launch Stack:
The following table lists and describes the template parameters for deploying a kerberized Amazon EMR cluster and configuring a cross-realm trust.
Parameter
Default
Description
Active Directory domain
example.com
The Active Directory domain that you want to establish the cross-realm trust with.
Domain admin user (joiner user)
CrossRealmAdmin
The user name of an Active Directory domain user with privileges to join domains/computers to the Active Directory domain (joiner user).
Domain admin password
Requires input
Password of the joiner user.
Cross-realm trust password
Requires input
Password of your cross-realm trust.
EC2 key pair name
Requires input
Name of an existing key pair, which enables you to connect securely to your cluster after it launches.
Subnet ID
Requires input
Subnet that you want to use for your Amazon EMR cluster (for example, choose the subnet created in the Create and configure an Amazon VPC step).
Security group ID
Requires input
Security group that you want to use for your Amazon EMR cluster (for example, choose the security group created in the Create and configure an Amazon VPC step).
Instance type
m4.xlarge
The instance type that you want to use for the cluster nodes.
Instance count
2
The number of instances (core nodes) for the cluster.
Allowed IP address
10.0.0.0/16
The client IP address can that can reach your cluster. Specify an IP address range in CIDR notation (for example, 203.0.113.5/32). By default, only the VPC CIDR (10.0.0.0/16) can reach the cluster. Be sure to add your client IP range so that you can connect to the cluster using SSH.
EMR applications
Hadoop, Spark, Ganglia, Hive
Comma separated list of the applications that you want installed on the cluster.
EMR Kerberos realm
EC2.INTERNAL
Cluster’s Kerberos realm name. By default, the realm name is derived from the cluster’s VPC domain name in uppercase letters (for example, EC2.INTERNAL is the default VPC domain name in the us-east-1 Region).
Trusted AD domain
EXAMPLE.COM
The Active Directory domain that you want to trust. This name is the same as the “AD domain name.” However, it must use all uppercase letters (for example, EXAMPLE.COM).
It takes 10–15 minutes for this stack to be created. When it’s complete, note the stack’s outputs, and then move to the next section: Managing and testing the solution.
Managing and testing the solution
Now that you’ve configured and built the solution, it’s time to test it by connecting to a cluster using Active Directory credentials.
SSH to a cluster using Active Directory credentials (single sign-on)
After you launch a kerberized Amazon EMR cluster, if you used the AWS CloudFormation templates and added your client IP range to the Allowed IP address parameter, you should be able to connect to the cluster using an SSH client and your Active Directory user credentials. If you have trouble connecting to the cluster using SSH, check the cluster’s security group to make sure that it allows inbound SSH connection (TCP port 22) from your client’s IP address (source).
The following steps assume that you’re using a client such as OpenSSH. If you’re using a different SSH application (for example, PuTTY), consult the application-specific documentation.
Note: Because the cluster was launched with a cross-realm trust configuration, you don’t need to use a private key (.pem file) when you connect to it as a domain user using SSH.
To connect to your Amazon EMR cluster as an Active Directory user using SSH, run the following command. Replace ad_user with the domain admin user that you created while setting up the domain controller and replace master_node_URL with the cluster’s URL (see the stack’s outputs to find this information):
$ ssh -l <ad_user> <master_node_URL>
If your SSH client is configured to use a key as the preferred authentication method, the login might fail. If that’s the case, you can add the following options to your SSH command to force the SSH connection to use password authentication:
After a domain user connects to the cluster using SSH, if this is the first that the user is connecting, a local home directory is created for that user. In addition to creating a local home directory, if you used the create-hdfs-home-ba.sh bootstrap action when launching the cluster (done by default if you used the AWS CloudFormation template to launch a kerberized cluster), an HDFS user home directory is also automatically created.
Note: If you manually launched the cluster and did not use the create-hdfs-home-ba.sh bootstrap action, then you’ll need to manually create HDFS user home directories for your users.
When you connect to the cluster using SSH for the first time (as a domain user), you should see the following messages if the HDFS home directory for your domain user was successfully created:
Running jobs on a kerberized Amazon EMR cluster
To run a job on a kerberized cluster, the user submitting the job must first be authenticated. If you followed the previous section to connect to your cluster as an Active Directory user using SSH, the user should be authenticated automatically.
If running the klist command returns a “No credentials cache found” message, it means that the user is not authenticated (the user doesn’t have a Kerberos ticket). You can re-authenticate a user at any time by running the following command (be sure to use all uppercase letters for the Active Directory domain):
$ kinit <username>@<AD_DOMAIN>
When the user is authenticated, they can submit jobs just like they would on a non-kerberized cluster.
Auditing jobs
Another advantage that Kerberos can provide is that you can easily tell which user ran a particular job. For example, connect (using SSH) to a kerberized cluster with an Active Directory user, and submit the SparkPi sample application:
$ spark-example SparkPi
After running the SparkPi application, go to the Amazon EMR console and choose your cluster. Then choose the Application history tab. There you can see information about the application, including the user that submitted the job:
Common issues
Although it would be hard to cover every possible Kerberos issue, this section covers some of the more common issues that might occur and ways to fix them.
Issue 1: You can successfully connect and get authenticated on a cluster. However, whenever you try running job, it fails with an error similar to the following:
Solution: Make sure that an HDFS home directory for the user was created and that it has the right permissions.
Issue 2: You can successfully connect to the cluster, but you can’t run any Hadoop or HDFS commands.
Solution: Use the klist command to confirm whether the user is authenticated and has a valid Kerberos ticket. Use the kinit command to re-authenticate a user.
Issue 3: You can’t connect (using SSH) to the cluster using Active Directory user credentials, but you can manually authenticate the user with kinit.
Solution: Make sure that the Active Directory domain controller is the DNS server (name server) for the cluster nodes.
Cleaning up
After completing and testing this solution, remember to clean up the resources. If you used the AWS CloudFormation templates to create the resources, then use the AWS CloudFormation console or AWS CLI/SDK to delete the stacks. Deleting a stack also deletes the resources created by that stack.
If one of your stacks does not delete, make sure that there are no dependencies on the resources created by that stack. For example, if you deployed an Amazon VPC using AWS CloudFormation and then deployed a domain controller into that VPC using a different AWS CloudFormation stack, you must first delete the domain controller stack before the VPC stack can be deleted.
Summary
The ability to authenticate users and services with Kerberos not only allows you to secure your big data applications, but it also enables you to easily integrate Amazon EMR clusters with an Active Directory environment. This post showed how you can use Kerberos on Amazon EMR to create a single sign-on solution where Active Directory domain users can seamlessly access Amazon EMR clusters and run big data applications. We also showed how you can use AWS CloudFormation to automate the deployment of this solution.
With Amazon Redshift, you can build petabyte-scale data warehouses that unify data from a variety of internal and external sources. Because Amazon Redshift is optimized for complex queries (often involving multiple joins) across large tables, it can handle large volumes of retail, inventory, and financial data without breaking a sweat.
In this post, we describe how to combine data in Aurora in Amazon Redshift. Here’s an overview of the solution:
Use AWS Lambda functions with Amazon Aurora to capture data changes in a table.
Serverless architecture for capturing and analyzing Aurora data changes
Consider a scenario in which an e-commerce web application uses Amazon Aurora for a transactional database layer. The company has a sales table that captures every single sale, along with a few corresponding data items. This information is stored as immutable data in a table. Business users want to monitor the sales data and then analyze and visualize it.
In this example, you take the changes in data in an Aurora database table and save it in Amazon S3. After the data is captured in Amazon S3, you combine it with data in your existing Amazon Redshift cluster for analysis.
By the end of this post, you will understand how to capture data events in an Aurora table and push them out to other AWS services using AWS Lambda.
The following diagram shows the flow of data as it occurs in this tutorial:
The starting point in this architecture is a database insert operation in Amazon Aurora. When the insert statement is executed, a custom trigger calls a Lambda function and forwards the inserted data. Lambda writes the data that it received from Amazon Aurora to a Kinesis data delivery stream. Kinesis Data Firehose writes the data to an Amazon S3 bucket. Once the data is in an Amazon S3 bucket, it is queried in place using Amazon Redshift Spectrum.
Creating an Aurora database
First, create a database by following these steps in the Amazon RDS console:
Sign in to the AWS Management Console, and open the Amazon RDS console.
Choose Launch a DB instance, and choose Next.
For Engine, choose Amazon Aurora.
Choose a DB instance class. This example uses a small, since this is not a production database.
In Multi-AZ deployment, choose No.
Configure DB instance identifier, Master username, and Master password.
Launch the DB instance.
After you create the database, use MySQL Workbench to connect to the database using the CNAME from the console. For information about connecting to an Aurora database, see Connecting to an Amazon Aurora DB Cluster.
The following screenshot shows the MySQL Workbench configuration:
Next, create a table in the database by running the following SQL statement:
Create Table
CREATE TABLE Sales (
InvoiceID int NOT NULL AUTO_INCREMENT,
ItemID int NOT NULL,
Category varchar(255),
Price double(10,2),
Quantity int not NULL,
OrderDate timestamp,
DestinationState varchar(2),
ShippingType varchar(255),
Referral varchar(255),
PRIMARY KEY (InvoiceID)
)
You can now populate the table with some sample data. To generate sample data in your table, copy and run the following script. Ensure that the highlighted (bold) variables are replaced with appropriate values.
The following screenshot shows how the table appears with the sample data:
Sending data from Amazon Aurora to Amazon S3
There are two methods available to send data from Amazon Aurora to Amazon S3:
Using a Lambda function
Using SELECT INTO OUTFILE S3
To demonstrate the ease of setting up integration between multiple AWS services, we use a Lambda function to send data to Amazon S3 using Amazon Kinesis Data Firehose.
Alternatively, you can use a SELECT INTO OUTFILE S3 statement to query data from an Amazon Aurora DB cluster and save it directly in text files that are stored in an Amazon S3 bucket. However, with this method, there is a delay between the time that the database transaction occurs and the time that the data is exported to Amazon S3 because the default file size threshold is 6 GB.
Creating a Kinesis data delivery stream
The next step is to create a Kinesis data delivery stream, since it’s a dependency of the Lambda function.
To create a delivery stream:
Open the Kinesis Data Firehose console
Choose Create delivery stream.
For Delivery stream name, type AuroraChangesToS3.
For Source, choose Direct PUT.
For Record transformation, choose Disabled.
For Destination, choose Amazon S3.
In the S3 bucket drop-down list, choose an existing bucket, or create a new one.
Enter a prefix if needed, and choose Next.
For Data compression, choose GZIP.
In IAM role, choose either an existing role that has access to write to Amazon S3, or choose to generate one automatically. Choose Next.
Review all the details on the screen, and choose Create delivery stream when you’re finished.
Creating a Lambda function
Now you can create a Lambda function that is called every time there is a change that needs to be tracked in the database table. This Lambda function passes the data to the Kinesis data delivery stream that you created earlier.
To create the Lambda function:
Open the AWS Lambda console.
Ensure that you are in the AWS Region where your Amazon Aurora database is located.
If you have no Lambda functions yet, choose Get started now. Otherwise, choose Create function.
Choose Author from scratch.
Give your function a name and select Python 3.6 for Runtime
Choose and existing or create a new Role, the role would need to have access to call firehose:PutRecord
Choose Next on the trigger selection screen.
Paste the following code in the code window. Change the stream_name variable to the Kinesis data delivery stream that you created in the previous step.
Choose File -> Save in the code editor and then choose Save.
Once you are finished, the Amazon Aurora database has access to invoke a Lambda function.
Creating a stored procedure and a trigger in Amazon Aurora
Now, go back to MySQL Workbench, and run the following command to create a new stored procedure. When this stored procedure is called, it invokes the Lambda function you created. Change the ARN in the following code to your Lambda function’s ARN.
DROP PROCEDURE IF EXISTS CDC_TO_FIREHOSE;
DELIMITER ;;
CREATE PROCEDURE CDC_TO_FIREHOSE (IN ItemID VARCHAR(255),
IN Category varchar(255),
IN Price double(10,2),
IN Quantity int(11),
IN OrderDate timestamp,
IN DestinationState varchar(2),
IN ShippingType varchar(255),
IN Referral varchar(255)) LANGUAGE SQL
BEGIN
CALL mysql.lambda_async('arn:aws:lambda:us-east-1:XXXXXXXXXXXXX:function:CDCFromAuroraToKinesis',
CONCAT('{ "ItemID" : "', ItemID,
'", "Category" : "', Category,
'", "Price" : "', Price,
'", "Quantity" : "', Quantity,
'", "OrderDate" : "', OrderDate,
'", "DestinationState" : "', DestinationState,
'", "ShippingType" : "', ShippingType,
'", "Referral" : "', Referral, '"}')
);
END
;;
DELIMITER ;
Create a trigger TR_Sales_CDC on the Sales table. When a new record is inserted, this trigger calls the CDC_TO_FIREHOSE stored procedure.
DROP TRIGGER IF EXISTS TR_Sales_CDC;
DELIMITER ;;
CREATE TRIGGER TR_Sales_CDC
AFTER INSERT ON Sales
FOR EACH ROW
BEGIN
SELECT NEW.ItemID , NEW.Category, New.Price, New.Quantity, New.OrderDate
, New.DestinationState, New.ShippingType, New.Referral
INTO @ItemID , @Category, @Price, @Quantity, @OrderDate
, @DestinationState, @ShippingType, @Referral;
CALL CDC_TO_FIREHOSE(@ItemID , @Category, @Price, @Quantity, @OrderDate
, @DestinationState, @ShippingType, @Referral);
END
;;
DELIMITER ;
If a new row is inserted in the Sales table, the Lambda function that is mentioned in the stored procedure is invoked.
Verify that data is being sent from the Lambda function to Kinesis Data Firehose to Amazon S3 successfully. You might have to insert a few records, depending on the size of your data, before new records appear in Amazon S3. This is due to Kinesis Data Firehose buffering. To learn more about Kinesis Data Firehose buffering, see the “Amazon S3” section in Amazon Kinesis Data Firehose Data Delivery.
Every time a new record is inserted in the sales table, a stored procedure is called, and it updates data in Amazon S3.
Querying data in Amazon Redshift
In this section, you use the data you produced from Amazon Aurora and consume it as-is in Amazon Redshift. In order to allow you to process your data as-is, where it is, while taking advantage of the power and flexibility of Amazon Redshift, you use Amazon Redshift Spectrum. You can use Redshift Spectrum to run complex queries on data stored in Amazon S3, with no need for loading or other data prep.
Just create a data source and issue your queries to your Amazon Redshift cluster as usual. Behind the scenes, Redshift Spectrum scales to thousands of instances on a per-query basis, ensuring that you get fast, consistent performance even as your dataset grows to beyond an exabyte! Being able to query data that is stored in Amazon S3 means that you can scale your compute and your storage independently. You have the full power of the Amazon Redshift query model and all the reporting and business intelligence tools at your disposal. Your queries can reference any combination of data stored in Amazon Redshift tables and in Amazon S3.
Redshift Spectrum supports open, common data types, including CSV/TSV, Apache Parquet, SequenceFile, and RCFile. Files can be compressed using gzip or Snappy, with other data types and compression methods in the works.
Next, create an IAM role that has access to Amazon S3 and Athena. By default, Amazon Redshift Spectrum uses the Amazon Athena data catalog. Your cluster needs authorization to access your external data catalog in AWS Glue or Athena and your data files in Amazon S3.
In the demo setup, I attached AmazonS3FullAccess and AmazonAthenaFullAccess. In a production environment, the IAM roles should follow the standard security of granting least privilege. For more information, see IAM Policies for Amazon Redshift Spectrum.
create external schema if not exists spectrum_schema
from data catalog
database 'spectrum_db'
region 'us-east-1'
IAM_ROLE 'arn:aws:iam::XXXXXXXXXXXX:role/RedshiftSpectrumRole'
create external database if not exists;
Don’t forget to replace the IAM role in the statement.
Then create an external table within the database:
CREATE EXTERNAL TABLE IF NOT EXISTS spectrum_schema.ecommerce_sales(
ItemID int,
Category varchar,
Price DOUBLE PRECISION,
Quantity int,
OrderDate TIMESTAMP,
DestinationState varchar,
ShippingType varchar,
Referral varchar)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
LOCATION 's3://{BUCKET_NAME}/CDC/'
Query the table, and it should contain data. This is a fact table.
select top 10 * from spectrum_schema.ecommerce_sales
Next, create a dimension table. For this example, we create a date/time dimension table. Create the table:
CREATE TABLE date_dimension (
d_datekey integer not null sortkey,
d_dayofmonth integer not null,
d_monthnum integer not null,
d_dayofweek varchar(10) not null,
d_prettydate date not null,
d_quarter integer not null,
d_half integer not null,
d_year integer not null,
d_season varchar(10) not null,
d_fiscalyear integer not null)
diststyle all;
Populate the table with data:
copy date_dimension from 's3://reparmar-lab/2016dates'
iam_role 'arn:aws:iam::XXXXXXXXXXXX:role/redshiftspectrum'
DELIMITER ','
dateformat 'auto';
The date dimension table should look like the following:
Querying data in local and external tables using Amazon Redshift
Now that you have the fact and dimension table populated with data, you can combine the two and run analysis. For example, if you want to query the total sales amount by weekday, you can run the following:
select sum(quantity*price) as total_sales, date_dimension.d_season
from spectrum_schema.ecommerce_sales
join date_dimension on spectrum_schema.ecommerce_sales.orderdate = date_dimension.d_prettydate
group by date_dimension.d_season
You get the following results:
Similarly, you can replace d_season with d_dayofweek to get sales figures by weekday:
With Amazon Redshift Spectrum, you pay only for the queries you run against the data that you actually scan. We encourage you to use file partitioning, columnar data formats, and data compression to significantly minimize the amount of data scanned in Amazon S3. This is important for data warehousing because it dramatically improves query performance and reduces cost.
Partitioning your data in Amazon S3 by date, time, or any other custom keys enables Amazon Redshift Spectrum to dynamically prune nonrelevant partitions to minimize the amount of data processed. If you store data in a columnar format, such as Parquet, Amazon Redshift Spectrum scans only the columns needed by your query, rather than processing entire rows. Similarly, if you compress your data using one of the supported compression algorithms in Amazon Redshift Spectrum, less data is scanned.
Analyzing and visualizing Amazon Redshift data in Amazon QuickSight
After modifying the Amazon Redshift security group, go to Amazon QuickSight. Create a new analysis, and choose Amazon Redshift as the data source.
Enter the database connection details, validate the connection, and create the data source.
Choose the schema to be analyzed. In this case, choose spectrum_schema, and then choose the ecommerce_sales table.
Next, we add a custom field for Total Sales = Price*Quantity. In the drop-down list for the ecommerce_sales table, choose Edit analysis data sets.
On the next screen, choose Edit.
In the data prep screen, choose New Field. Add a new calculated field Total Sales $, which is the product of the Price*Quantity fields. Then choose Create. Save and visualize it.
Next, to visualize total sales figures by month, create a graph with Total Sales on the x-axis and Order Data formatted as month on the y-axis.
After you’ve finished, you can use Amazon QuickSight to add different columns from your Amazon Redshift tables and perform different types of visualizations. You can build operational dashboards that continuously monitor your transactional and analytical data. You can publish these dashboards and share them with others.
Final notes
Amazon QuickSight can also read data in Amazon S3 directly. However, with the method demonstrated in this post, you have the option to manipulate, filter, and combine data from multiple sources or Amazon Redshift tables before visualizing it in Amazon QuickSight.
In this example, we dealt with data being inserted, but triggers can be activated in response to an INSERT, UPDATE, or DELETE trigger.
Keep the following in mind:
Be careful when invoking a Lambda function from triggers on tables that experience high write traffic. This would result in a large number of calls to your Lambda function. Although calls to the lambda_async procedure are asynchronous, triggers are synchronous.
A statement that results in a large number of trigger activations does not wait for the call to the AWS Lambda function to complete. But it does wait for the triggers to complete before returning control to the client.
Similarly, you must account for Amazon Kinesis Data Firehose limits. By default, Kinesis Data Firehose is limited to a maximum of 5,000 records/second. For more information, see Monitoring Amazon Kinesis Data Firehose.
In certain cases, it may be optimal to use AWS Database Migration Service (AWS DMS) to capture data changes in Aurora and use Amazon S3 as a target. For example, AWS DMS might be a good option if you don’t need to transform data from Amazon Aurora. The method used in this post gives you the flexibility to transform data from Aurora using Lambda before sending it to Amazon S3. Additionally, the architecture has the benefits of being serverless, whereas AWS DMS requires an Amazon EC2 instance for replication.
Re Alvarez-Parmar is a solutions architect for Amazon Web Services. He helps enterprises achieve success through technical guidance and thought leadership. In his spare time, he enjoys spending time with his two kids and exploring outdoors.
You don’t have to worry if you patch. If you download the latest update from Microsoft, Apple, or Linux, then the problem is fixed for you and you don’t have to worry. If you aren’t up to date, then there’s a lot of other nasties out there you should probably also be worrying about. I mention this because while this bug is big in the news, it’s probably not news the average consumer needs to concern themselves with.
This will force a redesign of CPUs and operating systems. While not a big news item for consumers, it’s huge in the geek world. We’ll need to redesign operating systems and how CPUs are made.
Don’t worry about the performance hit. Some, especially avid gamers, are concerned about the claims of “30%” performance reduction when applying the patch. That’s only in some rare cases, so you shouldn’t worry too much about it. As far as I can tell, 3D games aren’t likely to see less than 1% performance degradation. If you imagine your game is suddenly slower after the patch, then something else broke it.
This wasn’t foreseeable. A common cliche is that such bugs happen because people don’t take security seriously, or that they are taking “shortcuts”. That’s not the case here. Speculative execution and timing issues with caches are inherent issues with CPU hardware. “Fixing” this would make CPUs run ten times slower. Thus, while we can tweek hardware going forward, the larger change will be in software.
There’s no good way to disclose this. The cybersecurity industry has a process for coordinating the release of such bugs, which appears to have broken down. In truth, it didn’t. Once Linus announced a security patch that would degrade performance of the Linux kernel, we knew the coming bug was going to be Big. Looking at the Linux patch, tracking backwards to the bug was only a matter of time. Hence, the release of this information was a bit sooner than some wanted. This is to be expected, and is nothing to be upset about.
It helps to have a name. Many are offended by the crassness of naming vulnerabilities and giving them logos. On the other hand, we are going to be talking about these bugs for the next decade. Having a recognizable name, rather than a hard-to-remember number, is useful.
Should I stop buying Intel? Intel has the worst of the bugs here. On the other hand, ARM and AMD alternatives have their own problems. Many want to deploy ARM servers in their data centers, but these are likely to expose bugs you don’t see on x86 servers. The software fix, “page table isolation”, seems to work, so there might not be anything to worry about. On the other hand, holding up purchases because of “fear” of this bug is a good way to squeeze price reductions out of your vendor. Conversely, later generation CPUs, “Haswell” and even “Skylake” seem to have the least performance degradation, so it might be time to upgrade older servers to newer processors.
Intel misleads. Intel has a press release that implies they are not impacted any worse than others. This is wrong: the “Meltdown” issue appears to apply only to Intel CPUs. I don’t like such marketing crap, so I mention it.
We find that AWS customers often require that every query submitted to Presto running on Amazon EMR is logged. They want to track what query was submitted, when it was submitted and who submitted it.
In this blog post, we will demonstrate how to implement and install a Presto event listener for purposes of custom logging, debugging and performance analysis for queries executed on an EMR cluster. An event listener is a plugin to Presto that is invoked when an event such as query creation, query completion, or split completion occurs.
Presto also provides a system connector to access metrics and details about a running cluster. In particular, the system connector gets information about currently running and recently run queries by using the system.runtime.queries table. From the Presto command line interface (CLI), you get this data with the entries Select * from system.runtime.queries; and Select * from system.runtime.tasks;. This connector is available out of the box and exposes information and metrics through SQL.
In addition to providing custom logging and debugging, the Presto event listener allows you to enhance the information provided by the system connector by providing a mechanism to capture detailed query context, query statistics and failure information during query creation or completion, or a split completion.
We will begin by providing a detailed walkthrough of the implementation of the Presto event listener in Java followed by its deployment on the EMR cluster.
Implementation
We use the Eclipse IDE to create a Maven Project, as shown below:
Once you have created the Maven Project, modify the pom.xml file to add the dependency for Presto, as shown following:
After you add the Presto dependency to our pom.xml file, create a Java package under the src/main/java folder. In our project, we have named the package com.amazonaws.QueryEventListener. You can choose the naming convention that best fits your organization. Within this package, create three Java files for the EventListener, the EventListenerFactory, and the EventListenerPlugin.
As the Presto website says: “EventListenerFactory is responsible for creating an EventListener instance. It also defines an EventListener name, which is used by the administrator in a Presto configuration. Implementations of EventListener implement methods for the event types they are interested in handling. The implementation of EventListener and EventListenerFactory must be wrapped as a plugin and installed on the Presto cluster.”
In our project, we have named these Java files QueryEventListener, QueryEventListenerFactory, and QueryEventListenerPlugin:
Now we write our code for the Java files.
QueryEventListener – QueryEventListener implements the Presto EventListener interface. It has a constructor that creates five rotating log files of 524 MB each. After creating QueryEventListener, we implement the query creation, query completion, and split completion methods and log the events relevant to us. You can choose to include more events based on your needs.
QueryEventListenerFactory – The QueryEventListenerFactory class implements the Presto EventListenerFactory interface. We implement the method getName, which provides a registered EventListenerFactory name to Presto. We also implement the create method, which creates an EventListener instance.
You can find the code for the QueryEventListenerFactory class in this Amazon repository.
QueryEventListenerPlugin – The QueryEventListenerPlugin class implements the Presto EventListenerPlugin interface. This class has a getEventListenerFactories method that returns an immutable list containing the EventListenerFactory. Basically, in this class we are wrapping QueryEventListener and QueryEventListenerFactory.
Finally, in our project we add the META-INF folder and a services subfolder within the META-INF folder. In the services subfolder, you create a file called com.facebook.presto.spi.Plugin:
As the Presto documentation describes: “Each plugin identifies an entry point: an implementation of the plugin interface. This class name is provided to Presto via the standard Java ServiceLoader interface: the classpath contains a resource file named com.facebook.presto.spi.Plugin in the META-INF/services directory”.
We add the name of our plugin class com.amazonaws.QueryEventListener.QueryEventListenerPlugin to the com.facebook.presto.spi.Plugin file, as shown below:
Next we will show you how to deploy the Presto plugin we created to Amazon EMR for custom logging.
Presto logging overview on Amazon EMR
Presto by default will produce three log files that capture the configurations properties and the overall operational events of the components that make up Presto, plus log end user access to the Presto UI.
On Amazon EMR, these log files are written into /var/log/presto. The log files in this directory are pushed into Amazon S3. This S3 location is the location of the new log file.
Steps to deploy Presto on Amazon EMR with custom logging
To deploy Presto on EMR with custom logging a bootstrap action will be used. The bootstrap is available in this Amazon Repository.
#!/bin/bash
IS_MASTER=true
if [ -f /mnt/var/lib/info/instance.json ]
then
if grep isMaster /mnt/var/lib/info/instance.json | grep true;
then
IS_MASTER=true
else
IS_MASTER=false
fi
fi
sudo mkdir -p /usr/lib/presto/plugin/queryeventlistener
sudo /usr/bin/aws s3 cp s3://replace-with-your-bucket/QueryEventListener.jar /tmp
sudo cp /tmp/QueryEventListener.jar /usr/lib/presto/plugin/queryeventlistener/
if [ "$IS_MASTER" = true ]; then
sudo mkdir -p /usr/lib/presto/etc
sudo bash -c 'cat <<EOT >> /usr/lib/presto/etc/event-listener.properties
event-listener.name=event-listener
EOT'
fi
First, upload the JAR file created on the last section and update the s3 location in the bootstrap, s3://replace-with-your-bucket/QueryEventListener.jar, with the bucket name where the jar was placed.
After updating the bootstrap with the S3 location for your JAR, upload that bootstrap to your own bucket.
The bootstrap action will copy the jar file with the custom EventListener implementation into all machines of the cluster. Moreover, the bootstrap action will create a file named event-listener.properties on the Amazon EMR Master node. This file will configure the coordinator to enable the custom logging plugin via property event-listener.name. The event-listener.name property is set to event-listener in the event-listener.properties file. As per Presto documentation, this property is used by Presto to find a registered EventListenerFactory based on the name returned by EventListenerFactory.getName().
Now that the bootstrap is ready, the following AWS CLI command can be used to create a new EMR cluster with the bootstrap:
After we have implemented the Presto custom logging plugin and deployed it, we can run a test and see the output of the log files.
We first create a Hive table with the DDL below from the Hive Command Line (simply type Hive on the SSH terminal to the EMR Master to access the Hive Command Line):
CREATE EXTERNAL TABLE wikistats (
language STRING,
page_title STRING,
hits BIGINT,
retrived_size BIGINT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
LINES TERMINATED BY '\n'
LOCATION 's3://support.elasticmapreduce/training/datasets/wikistats/';
Then, we access the presto command line by typing the following command on the terminal:
We then go to the /var/log/presto directory and look at the contents of the log file queries-YYYY-MM-DDTHH\:MM\:SS.0.log. As depicted in the screenshot below, our QueryEventListener plugin captures the fields shown for the Query Created and Query Completed events. Moreover, if there are splits, the plugin will also capture split events.
Note: If you want to include the query text executed by the user for auditing and debugging purposes, add the field appropriately in the QueryEventListener class methods, as shown below:
Because this is custom logging, you can capture as many fields as are available for the particular events. To find out the fields available for each of the events, see the Java Classes provided by Presto at this GitHub location.
Summary
In this post, you learned how to add custom logging to Presto on EMR to enhance your organization’s auditing capabilities and provide insights into performance.
If you have questions or suggestions, please leave a comment below.
Zafar Kapadia is a Cloud Application Architect for AWS. He works on Application Development and Optimization projects. He is also an avid cricketer and plays in various local leagues.
Francisco Oliveira is a Big Data Engineer with AWS Professional Services. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.
This post courtesy of Ryan Green, Software Development Engineer, AWS Serverless
The concepts of blue/green and canary deployments have been around for a while now and have been well-established as best-practices for reducing the risk of software deployments.
In a traditional, horizontally scaled application, copies of the application code are deployed to multiple nodes (instances, containers, on-premises servers, etc.), typically behind a load balancer. In these applications, deploying new versions of software to too many nodes at the same time can impact application availability as there may not be enough healthy nodes to service requests during the deployment. This aggressive approach to deployments also drastically increases the blast radius of software bugs introduced in the new version and does not typically give adequate time to safely assess the quality of the new version against production traffic.
In such applications, one commonly accepted solution to these problems is to slowly and incrementally roll out application software across the nodes in the fleet while simultaneously verifying application health (canary deployments). Another solution is to stand up an entirely different fleet and weight (or flip) traffic over to the new fleet after verification, ideally with some production traffic (blue/green). Some teams deploy to a single host (“one box environment”), where the new release can bake for some time before promotion to the rest of the fleet. Techniques like this enable the maintainers of complex systems to safely test in production while minimizing customer impact.
Enter Serverless
There is somewhat of an impedance mismatch when mapping these concepts to a serverless world. You can’t incrementally deploy your software across a fleet of servers when there are no servers!* In fact, even the term “deployment” takes on a different meaning with functions as a service (FaaS). In AWS Lambda, a “deployment” can be roughly modeled as a call to CreateFunction, UpdateFunctionCode, or UpdateAlias (I won’t get into the semantics of whether updating configuration counts as a deployment), all of which may affect the version of code that is invoked by clients.
The abstractions provided by Lambda remove the need for developers to be concerned about servers and Availability Zones, and this provides a powerful opportunity to greatly simplify the process of deploying software. *Of course there are servers, but they are abstracted away from the developer.
Traffic shifting with Lambda aliases
Before the release of traffic shifting for Lambda aliases, deployments of a Lambda function could only be performed in a single “flip” by updating function code for version $LATEST, or by updating an alias to target a different function version. After the update propagates, typically within a few seconds, 100% of function invocations execute the new version. Implementing canary deployments with this model required the development of an additional routing layer, further adding development time, complexity, and invocation latency. While rolling back a bad deployment of a Lambda function is a trivial operation and takes effect near instantaneously, deployments of new versions for critical functions can still be a potentially nerve-racking experience.
With the introduction of alias traffic shifting, it is now possible to trivially implement canary deployments of Lambda functions. By updating additional version weights on an alias, invocation traffic is routed to the new function versions based on the weight specified. Detailed CloudWatch metrics for the alias and version can be analyzed during the deployment, or other health checks performed, to ensure that the new version is healthy before proceeding.
Note: Sometimes the term “canary deployments” refers to the release of software to a subset of users. In the case of alias traffic shifting, the new version is released to some percentage of all users. It’s not possible to shard based on identity without adding an additional routing layer.
Examples
The simplest possible use of a canary deployment looks like the following:
# Update $LATEST version of function
aws lambda update-function-code --function-name myfunction ….
# Publish new version of function
aws lambda publish-version --function-name myfunction
# Point alias to new version, weighted at 5% (original version at 95% of traffic)
aws lambda update-alias --function-name myfunction --name myalias --routing-config '{"AdditionalVersionWeights" : {"2" : 0.05} }'
# Verify that the new version is healthy
…
# Set the primary version on the alias to the new version and reset the additional versions (100% weighted)
aws lambda update-alias --function-name myfunction --name myalias --function-version 2 --routing-config '{}'
This is begging to be automated! Here are a few options.
Simple deployment automation
This simple Python script runs as a Lambda function and deploys another function (how meta!) by incrementally increasing the weight of the new function version over a prescribed number of steps, while checking the health of the new version. If the health check fails, the alias is rolled back to its initial version. The health check is implemented as a simple check against the existence of Errors metrics in CloudWatch for the alias and new version.
# Rollout version 2 incrementally over 10 steps, with 120s between each step
aws lambda invoke --function-name SimpleDeployFunction --log-type Tail --payload \
'{"function-name": "MyFunction",
"alias-name": "MyAlias",
"new-version": "2",
"steps": 10,
"interval" : 120,
"type": "linear"
}' output
Description of input parameters
function-name: The name of the Lambda function to deploy
alias-name: The name of the alias used to invoke the Lambda function
new-version: The version identifier for the new version to deploy
steps: The number of times the new version weight is increased
interval: The amount of time (in seconds) to wait between weight updates
type: The function to use to generate the weights. Supported values: “linear”
Because this runs as a Lambda function, it is subject to the maximum timeout of 5 minutes. This may be acceptable for many use cases, but to achieve a slower rollout of the new version, a different solution is required.
Step Functions workflow
This state machine performs essentially the same task as the simple deployment function, but it runs as an asynchronous workflow in AWS Step Functions. A nice property of Step Functions is that the maximum deployment timeout has now increased from 5 minutes to 1 year!
The step function incrementally updates the new version weight based on the steps parameter, waiting for some time based on the interval parameter, and performing health checks between updates. If the health check fails, the alias is rolled back to the original version and the workflow fails.
Because the state machine runs asynchronously, retrieving feedback on the deployment requires polling for the execution status using DescribeExecution or implementing an asynchronous notification (using SNS or email, for example) from the Rollback or Finalize functions. A CloudWatch alarm could also be created to alarm based on the “ExecutionsFailed” metric for the state machine.
A note on health checks and observability
Weighted rollouts like this are considerably more successful if the code is being exercised and monitored continuously. In this example, it would help to have some automation continuously invoking the alias and reporting metrics on these invocations, such as client-side success rates and latencies.
The absence of Lambda Errors metrics used in these examples can be misleading if the function is not getting invoked. It’s also recommended to instrument your Lambda functions with custom metrics, in addition to Lambda’s built-in metrics, that can be used to monitor health during deployments.
Extensibility
These examples could be easily extended in various ways to support different use cases. For example:
Health check implementations: CloudWatch alarms, automatic invocations with payload assertions, querying external systems, etc.
Weight increase functions: Exponential, geometric progression, single canary step, etc.
Custom success/failure notifications: SNS, email, CI/CD systems, service discovery systems, etc.
Traffic shifting with SAM and CodeDeploy
Using the Lambda UpdateAlias operation with additional version weights provides a powerful primitive for you to implement custom traffic shifting solutions for Lambda functions.
For those not interested in building custom deployment solutions, AWS CodeDeploy provides an intuitive turn-key implementation of this functionality integrated directly into the Serverless Application Model. Traffic-shifted deployments can be declared in a SAM template, and CodeDeploy manages the function rollout as part of the CloudFormation stack update. CloudWatch alarms can also be configured to trigger a stack rollback if something goes wrong.
It is often the simple features that provide the most value. As I demonstrated in this post, serverless architectures allow the complex deployment orchestration used in traditional applications to be replaced with a simple Lambda function or Step Functions workflow. By allowing invocation traffic to be easily weighted to multiple function versions, Lambda alias traffic shifting provides a simple but powerful feature that I hope empowers you to easily implement safe deployment workflows for your Lambda functions.
За OpenFest 2017 за щанда на StorPool бях написал една задача, та който я реши, да получи тениска. Задачата звучи измамно просто и аз също не съм се усетил, че не е лесно решима за 10 минути.
Задачата е следната – имате директория с някакво количество файлове, да видите кои от тях са MD5 и кои – SHA1 колизии, и да дадете първите букви от имената им (4 файла за md5 и 4 за sha1). Моето решение беше във временна директория да се направят файлове с имена MD5 (и после – SHA1) сумите, в които да се напишат имената и SHA256 сумите на файловете с тая MD5 сума, и после с един sort на всеки файл лесно се вижда в кой има различни файлове (трябва да са еднакви по принцип). Ако е просто да се види коя е md5 сумата, може да се броят уникалните sha256 суми във всички файлове, да се види къде са колизиите.
Интересно ще ми е наистина ли е толкова трудна задачата (доколкото знам, за два дни само един човек я е решил за 10 минути).
Също така ми е интересно дали някой не е решил да пита google какви са checksum-ите на демонстрационните sha1/md5 колизии и да види дали аз не съм си събрал файловете по тоя начин…
Кодът, който генерира задачата е качен на https://vasil.ludost.net/progs/storpool-of-task.tgz. Вътре има gen.sh, който трябва да се пипне малко къде да прави файловете и който при пускане създава малко файлове и ви дава отговора. Не съм сложил другите неща (това, което се прави на login shell и нещото, което праща отговорите по slack на проверяващия), но те не са толкова интересни.
It’s almost always a good idea to support two-factor authentication (2FA), especially for back-office systems. 2FA comes in many different forms, some of which include SMS, TOTP, or even hardware tokens.
Enabling them requires a similar flow:
The user goes to their profile page (skip this if you want to force 2fa upon registration)
Clicks “Enable two-factor authentication”
Enters some data to enable the particular 2FA method (phone number, TOTP verification code, etc.)
Next time they login, in addition to the username and password, the login form requests the 2nd factor (verification code) and sends that along with the credentials
I will focus on Google Authenticator, which uses a TOTP (Time-based one-time password) for generating a sequence of verification codes. The ideas is that the server and the client application share a secret key. Based on that key and on the current time, both come up with the same code. Of course, clocks are not perfectly synced, so there’s a window of a few codes that the server accepts as valid.
How to implement that with Java (on the server)? Using the GoogleAuth library. The flow is as follows:
The user goes to their profile page
Clicks “Enable two-factor authentication”
The server generates a secret key, stores it as part of the user profile and returns a URL to a QR code
The user scans the QR code with their Google Authenticator app thus creating a new profile in the app
The user enters the verification code shown the app in a field that has appeared together with the QR code and clicks “confirm”
The server marks the 2FA as enabled in the user profile
If the user doesn’t scan the code or doesn’t verify the process, the user profile will contain just a orphaned secret key, but won’t be marked as enabled
There should be an option to later disable the 2FA from their user profile page
The most important bit from theoretical point of view here is the sharing of the secret key. The crypto is symmetric, so both sides (the authenticator app and the server) have the same key. It is shared via a QR code that the user scans. If an attacker has control on the user’s machine at that point, the secret can be leaked and thus the 2FA – abused by the attacker as well. But that’s not in the threat model – in other words, if the attacker has access to the user’s machine, the damage is already done anyway.
Upon login, the flow is as follows:
The user enters username and password and clicks “Login”
Using an AJAX request the page asks the server whether this email has 2FA enabled
If 2FA is not enabled, just submit the username & password form
If 2FA is enabled, the login form is not submitted, but instead an additional field is shown to let the user input the verification code from the authenticator app
After the user enters the code and presses login, the form can be submitted. Either using the same login button, or a new “verify” button, or the verification input + button could be an entirely new screen (hiding the username/password inputs).
The server then checks again if the user has 2FA enabled and if yes, verifies the verification code. If it matches, login is successful. If not, login fails and the user is allowed to reenter the credentials and the verification code. Note here that you can have different responses depending on whether username/password are wrong or in case the code is wrong. You can also attempt to login prior to even showing the verification code input. That way is arguably better, because that way you don’t reveal to a potential attacker that the user uses 2FA.
While I’m speaking of username and password, that can apply to any other authentication method. After you get a success confirmation from an OAuth / OpenID Connect / SAML provider, or after you can a token from SecureLogin, you can request the second factor (code).
In code, the above processes look as follows (using Spring MVC; I’ve merged the controller and service layer for brevity. You can replace the @AuthenticatedPrincipal bit with your way of supplying the currently logged in user details to the controllers). Assuming the methods are in controller mapped to “/user/”:
@RequestMapping(value = "/init2fa", method = RequestMethod.POST)
@ResponseBody
public String initTwoFactorAuth(@AuthenticationPrincipal LoginAuthenticationToken token) {
User user = getLoggedInUser(token);
GoogleAuthenticatorKey googleAuthenticatorKey = googleAuthenticator.createCredentials();
user.setTwoFactorAuthKey(googleAuthenticatorKey.getKey());
dao.update(user);
return GoogleAuthenticatorQRGenerator.getOtpAuthURL(GOOGLE_AUTH_ISSUER, email, googleAuthenticatorKey);
}
@RequestMapping(value = "/confirm2fa", method = RequestMethod.POST)
@ResponseBody
public boolean confirmTwoFactorAuth(@AuthenticationPrincipal LoginAuthenticationToken token, @RequestParam("code") int code) {
User user = getLoggedInUser(token);
boolean result = googleAuthenticator.authorize(user.getTwoFactorAuthKey(), code);
user.setTwoFactorAuthEnabled(result);
dao.update(user);
return result;
}
@RequestMapping(value = "/disable2fa", method = RequestMethod.GET)
@ResponseBody
public void disableTwoFactorAuth(@AuthenticationPrincipal LoginAuthenticationToken token) {
User user = getLoggedInUser(token);
user.setTwoFactorAuthKey(null);
user.setTwoFactorAuthEnabled(false);
dao.update(user);
}
@RequestMapping(value = "/requires2fa", method = RequestMethod.POST)
@ResponseBody
public boolean login(@RequestParam("email") String email) {
// TODO consider verifying the password here in order not to reveal that a given user uses 2FA
return userService.getUserDetailsByEmail(email).isTwoFactorAuthEnabled();
}
On the client side it’s simple AJAX requests to the above methods (sidenote: I kind of feel the term AJAX is no longer trendy, but I don’t know how to call them. Async? Background? Javascript?).
The login form code depends very much on the existing login form you are using, but the point is to call the /requires2fa with the email (and password) to check if 2FA is enabled and then show a verification code input.
Overall, the implementation if two-factor authentication is simple and I’d recommend it for most systems, where security is more important than simplicity of the user experience.
This time I need it to run in a cluster on AWS, so I went on to setup such a cluster. Googling how to do it gives several interesting results, like this, this and this, but they are either incomplete, or outdates, or have too many irrelevant details. So they are only of moderate help.
My goal is to use CloudFormation (or Terraform potentially) to launch a stack which has a Cassandra auto-scaling group (in a single region) that can grow as easily as increasing the number of nodes in the group.
Also, in order to have the web application connect to Cassandra without hardcoding the node IPs, I wanted to have a load balancer in front of all Cassandra nodes that does the round-robin for me. The alternative for that would be to have a client-side round-robin, but that would mean some extra complexity on the client which seems avoidable with a load balancer in front of the Cassandra auto-scaling group.
Sets up 3 private subnet (1 per availability zone in the eu-west region)
Creates a security group which allows incoming and outgoing ports that allow cassandra to accept connections (9042) and for the nodes to gossip (7000/7001). Note that the ports are only accessible from within the VPC, no external connection is allowed. SSH goes only through a bastion host.
Defines a TCP load balancer for port 9042 where all clients will connect. The load balancer requires a so-called “Target group” which is defined as well.
Configures an auto-scaling group, with a pre-configured number of nodes. The autoscaling group has a reference to the “target group”, so that the load balancer always sees all nodes in the auto-scaling group
Each node in the auto-scaling group is identical based on a launch configuration. The launch configuration runs a few scripts on initialization. These scripts will be run for every node – either initially, or in case a node dies and another one is spawned in its place, or when the cluster has to grow. The scripts are fetched from S3, where you can publish them (and version them) either manually, or with an automated process.
Note: this does not configure specific EBS volumes and in reality you may need to configure and attach them, if the instance storage is insufficient. Don’t worry about nodes dying, though, as data is safely replicated.
That was the easy part – a bunch of AWS resources and port configurations. The Cassandra-specific setup is a bit harder, as it requires understanding on how Cassandra functions.
The two scripts are setup-cassandra.sh and update-cassandra-cluster-config.py, so bash and python. Bash for setting-up the machine, and python for cassandra-specific stuff. Instead of the bash script one could use a pre-built AMI (image), e.g. with packer, but since only 2 pieces of software are installed, I thought it’s a bit of an overhead to support AMIs.
The bash script can be seen here, and simply installs Java 8 and the latest Cassandra, runs the python script, runs the Cassandra services and creates (if needed) a keyspace with proper replication configuration. A few notes here – the cassandra.yaml.template could be supplied via the cloudformation script instead of having it fetched via bash (and having the pass the bucket name); you could also have it fetched in the python script itself – it’s a matter of preference. Cassandra is not configured for use with SSL, which is generally a bad idea, but the SSL configuration is out of scope of the basic setup. Finally, the script waits for the Cassandra process to run (using a while/sleep loop) and then creates the keyspace if needed. The keyspace (=database) has to be created with a NetworkTopologyStrategy, and the number of replicas for the particular datacenter (=AWS region) has to be configured. The value is 3, for the 3 availability zones where we’ll have nodes. That means there’s a copy in each AZ (which is seen like a “rack”, although it’s exactly that).
The python script does some very important configurations – without them the cluster won’t work. (I don’t work with Python normally, so feel free to criticize my Python code). The script does the following:
Gets the current autoscaling group details (using AWS EC2 APIs)
Sorts the instances by time
Fetches the first instance in the group in order to assign it as seed node
Sets the seed node in the configuration file (by replacing a placeholder)
Sets the listen_address (and therefore rpc_address) to the private IP of the node in order to allow Cassandra to listen for incoming connections
Designating the seed node is important, as all cluster nodes have to join the cluster by specifying at least one seed. You can get the first two nodes instead of just one, but it shouldn’t matter. Note that the seed node is not always fixed – it’s just the oldest node in the cluster. If at some point the oldest node is terminated, each new node will use the second oldest as seed.
What I haven’t shown is the cassandra.yaml.template file. It is basically a copy of the cassandra.yaml file from a standard Cassandra installation, with a few changes:
cluster_name is modified to match your application name. This is just for human-readable purposes, doesn’t matter what you set it to.
allocate_tokens_for_keyspace: your_keyspace is uncommented and the keyspace is set to match your main keyspace. This enables the new token distribution algorithm in Cassandra 3.0. It allows for evenly distributing the data across nodes.
endpoint_snitch: Ec2Snitch is set instead of the SimpleSnitch to make use of AWS metadata APIs. Note that this setup is in a single region. For multi-region there’s another snitch and some addtional complications of exposing ports and changing the broadcast address.
as mentionted above, ${private_ip} and ${seeds} placeholders are placed in the appropriate places (listen_address and rpc_address for the IP) in order to allow substitution.
The lets you run a Cassandra cluster as part of your AWS stack, which is auto-scalable and doesn’t require any manual intervention – neither on setup, nor on scaling up. Well, allegedly – there may be issues that have to be resolved once you hit the usecases of reality. And for clients to connect to the cluster, simply use the load balancer DNS name (you can print it in a config file on each application node)
Apache Hive is one of the most popular tools for analyzing large datasets stored in a Hadoop cluster using SQL. Data analysts and scientists use Hive to query, summarize, explore, and analyze big data.
With the introduction of Hive LLAP (Low Latency Analytical Processing), the notion of Hive being just a batch processing tool has changed. LLAP uses long-lived daemons with intelligent in-memory caching to circumvent batch-oriented latency and provide sub-second query response times.
This post provides an overview of Hive LLAP, including its architecture and common use cases for boosting query performance. You will learn how to install and configure Hive LLAP on an Amazon EMR cluster and run queries on LLAP daemons.
What is Hive LLAP?
Hive LLAP was introduced in Apache Hive 2.0, which provides very fast processing of queries. It uses persistent daemons that are deployed on a Hadoop YARN cluster using Apache Slider. These daemons are long-running and provide functionality such as I/O with DataNode, in-memory caching, query processing, and fine-grained access control. And since the daemons are always running in the cluster, it saves substantial overhead of launching new YARN containers for every new Hive session, thereby avoiding long startup times.
When Hive is configured in hybrid execution mode, small and short queries execute directly on LLAP daemons. Heavy lifting (like large shuffles in the reduce stage) is performed in YARN containers that belong to the application. Resources (CPU, memory, etc.) are obtained in a traditional fashion using YARN. After the resources are obtained, the execution engine can decide which resources are to be allocated to LLAP, or it can launch Apache Tez processors in separate YARN containers. You can also configure Hive to run all the processing workloads on LLAP daemons for querying small datasets at lightning fast speeds.
LLAP daemons are launched under YARN management to ensure that the nodes don’t get overloaded with the compute resources of these daemons. You can use scheduling queues to make sure that there is enough compute capacity for other YARN applications to run.
Why use Hive LLAP?
With many options available in the market (Presto, Spark SQL, etc.) for doing interactive SQL over data that is stored in Amazon S3 and HDFS, there are several reasons why using Hive and LLAP might be a good choice:
For those who are heavily invested in the Hive ecosystem and have external BI tools that connect to Hive over JDBC/ODBC connections, LLAP plugs in to their existing architecture without a steep learning curve.
It’s compatible with existing Hive SQL and other Hive tools, like HiveServer2, and JDBC drivers for Hive.
It has native support for security features with authentication and authorization (SQL standards-based authorization) using HiveServer2.
LLAP daemons are aware about of the columns and records that are being processed which enables you to enforce fine-grained access control.
It can use Hive’s vectorization capabilities to speed up queries, and Hive has better support for Parquet file format when vectorization is enabled.
It can take advantage of a number of Hive optimizations like merging multiple small files for query results, automatically determining the number of reducers for joins and groupbys, etc.
It’s optional and modular so it can be turned on or off depending on the compute and resource requirements of the cluster. This lets you to run other YARN applications concurrently without reserving a cluster specifically for LLAP.
How do you install Hive LLAP in Amazon EMR?
To install and configure LLAP on an EMR cluster, use the following bootstrap action (BA):
This BA downloads and installs Apache Slider on the cluster and configures LLAP so that it works with EMR Hive. For LLAP to work, the EMR cluster must have Hive, Tez, and Apache Zookeeper installed.
You can pass the following arguments to the BA.
Argument
Definition
Default value
--instances
Number of instances of LLAP daemon
Number of core/task nodes of the cluster
--cache
Cache size per instance
20% of physical memory of the node
--executors
Number of executors per instance
Number of CPU cores of the node
--iothreads
Number of IO threads per instance
Number of CPU cores of the node
--size
Container size per instance
50% of physical memory of the node
--xmx
Working memory size
50% of container size
--log-level
Log levels for the LLAP instance
INFO
LLAP example
This section describes how you can try the faster Hive queries with LLAP using the TPC-DS testbench for Hive on Amazon EMR.
Use the following AWS command line interface (AWS CLI) command to launch a 1+3 nodes m4.xlarge EMR 5.6.0 cluster with the bootstrap action to install LLAP:
After the cluster is launched, log in to the master node using SSH, and do the following:
Open the hive-tpcds folder: cd /home/hadoop/hive-tpcds/
Start Hive CLI using the testbench configuration, create the required tables, and run the sample query: hive –i testbench.settings hive> source create_tables.sql; hive> source query55.sql; This sample query runs on a 40 GB dataset that is stored on Amazon S3. The dataset is generated using the data generation tool in the TPC-DS testbench for Hive.It results in output like the following:
This screenshot shows that the query finished in about 47 seconds for LLAP mode. Now, to compare this to the execution time without LLAP, you can run the same workload using only Tez containers: hive> set hive.llap.execution.mode=none; hive> source query55.sql; This query finished in about 80 seconds.
The difference in query execution time is almost 1.7 times when using just YARN containers in contrast to running the query on LLAP daemons. And with every rerun of the query, you notice that the execution time substantially decreases by the virtue of in-memory caching by LLAP daemons.
Conclusion
In this post, I introduced Hive LLAP as a way to boost Hive query performance. I discussed its architecture and described several use cases for the component. I showed how you can install and configure Hive LLAP on an Amazon EMR cluster and how you can run queries on LLAP daemons.
If you have questions about using Hive LLAP on Amazon EMR or would like to share your use cases, please leave a comment below.
Jigar Mistry is a Hadoop Systems Engineer with Amazon Web Services. He works with customers to provide them architectural guidance and technical support for processing large datasets in the cloud using open-source applications. In his spare time, he enjoys going for camping and exploring different restaurants in the Seattle area.
For teaching hacking/cybersecurity, I thought I’d create of the most obvious hacks of all time. Not the best hacks, the most sophisticated hacks, or the hacks with the biggest impact, but the most obvious hacks — ones that even the least knowledgeable among us should be able to understand. Below I propose some hacks that fit this bill, though in no particular order.
The reason I’m writing this is that my niece wants me to teach her some hacking. I thought I’d start with the obvious stuff first.
Shared Passwords
If you use the same password for every website, and one of those websites gets hacked, then the hacker has your password for all your websites. The reason your Facebook account got hacked wasn’t because of anything Facebook did, but because you used the same email-address and password when creating an account on “beagleforums.com”, which got hacked last year.
I’ve heard people say “I’m sure, because I choose a complex password and use it everywhere”. No, this is the very worst thing you can do. Sure, you can the use the same password on all sites you don’t care much about, but for Facebook, your email account, and your bank, you should have a unique password, so that when other sites get hacked, your important sites are secure.
And yes, it’s okay to write down your passwords on paper.
My accountant emails PDF statements encrypted with the last 4 digits of my Social Security Number. This is not encryption — a 4 digit number has only 10,000 combinations, and a hacker can guess all of them in seconds.
PIN numbers for ATM cards work because ATM machines are online, and the machine can reject your card after four guesses. PIN numbers don’t work for documents, because they are offline — the hacker has a copy of the document on their own machine, disconnected from the Internet, and can continue making bad guesses with no restrictions.
Passwords protecting documents must be long enough that even trillion upon trillion guesses are insufficient to guess.
The lazy way of combining websites with databases is to combine user input with an SQL statement. This combines code with data, so the obvious consequence is that hackers can craft data to mess with the code.
No, this isn’t obvious to the general public, but it should be obvious to programmers. The moment you write code that adds unfiltered user-input to an SQL statement, the consequence should be obvious. Yet, “SQL injection” has remained one of the most effective hacks for the last 15 years because somehow programmers don’t understand the consequence.
CGI shell injection is a similar issue. Back in early days, when “CGI scripts” were a thing, it was really important, but these days, not so much, so I just included it with SQL. The consequence of executing shell code should’ve been obvious, but weirdly, it wasn’t. The IT guy at the company I worked for back in the late 1990s came to me and asked “this guy says we have a vulnerability, is he full of shit?”, and I had to answer “no, he’s right — obviously so”.
XSS (“Cross Site Scripting”) [*] is another injection issue, but this time at somebody’s web browser rather than a server. It works because websites will echo back what is sent to them. For example, if you search for Cross Site Scripting with the URL https://www.google.com/search?q=cross+site+scripting, then you’ll get a page back from the server that contains that string. If the string is JavaScript code rather than text, then some servers (thought not Google) send back the code in the page in a way that it’ll be executed. This is most often used to hack somebody’s account: you send them an email or tweet a link, and when they click on it, the JavaScript gives control of the account to the hacker.
Cross site injection issues like this should probably be their own category, but I’m including it here for now.
In the C programming language, programmers first create a buffer, then read input into it. If input is long than the buffer, then it overflows. The extra bytes overwrite other parts of the program, letting the hacker run code.
Again, it’s not a thing the general public is expected to know about, but is instead something C programmers should be expected to understand. They should know that it’s up to them to check the length and stop reading input before it overflows the buffer, that there’s no language feature that takes care of this for them.
We are three decades after the first major buffer overflow exploits, so there is no excuse for C programmers not to understand this issue.
What makes particular obvious is the way they are wrapped in exploits, like in Metasploit. While the bug itself is obvious that it’s a bug, actually exploiting it can take some very non-obvious skill. However, once that exploit is written, any trained monkey can press a button and run the exploit. That’s where we get the insult “script kiddie” from — referring to wannabe-hackers who never learn enough to write their own exploits, but who spend a lot of time running the exploit scripts written by better hackers than they.
The first popular email server in the 1980s was called “SendMail”. It had a feature whereby if you send a “DEBUG” command to it, it would execute any code following the command. The consequence of this was obvious — hackers could (and did) upload code to take control of the server. This was used in the Morris Worm of 1988. Most Internet machines of the day ran SendMail, so the worm spread fast infecting most machines.
This bug was mostly ignored at the time. It was thought of as a theoretical problem, that might only rarely be used to hack a system. Part of the motivation of the Morris Worm was to demonstrate that such problems was to demonstrate the consequences — consequences that should’ve been obvious but somehow were rejected by everyone.
I’m conflicted whether I should add this or not, because here’s the deal: you are supposed to click on attachments and links within emails. That’s what they are there for. The difference between good and bad attachments/links is not obvious. Indeed, easy-to-use email systems makes detecting the difference harder.
On the other hand, the consequences of bad attachments/links is obvious. That worms like ILOVEYOU spread so easily is because people trusted attachments coming from their friends, and ran them.
We have no solution to the problem of bad email attachments and links. Viruses and phishing are pervasive problems. Yet, we know why they exist.
Default and backdoor passwords
The Mirai botnet was caused by surveillance-cameras having default and backdoor passwords, and being exposed to the Internet without a firewall. The consequence should be obvious: people will discover the passwords and use them to take control of the bots.
Surveillance-cameras have the problem that they are usually exposed to the public, and can’t be reached without a ladder — often a really tall ladder. Therefore, you don’t want a button consumers can press to reset to factory defaults. You want a remote way to reset them. Therefore, they put backdoor passwords to do the reset. Such passwords are easy for hackers to reverse-engineer, and hence, take control of millions of cameras across the Internet.
The same reasoning applies to “default” passwords. Many users will not change the defaults, leaving a ton of devices hackers can hack.
Masscan and background radiation of the Internet
I’ve written a tool that can easily scan the entire Internet in a short period of time. It surprises people that this possible, but it obvious from the numbers. Internet addresses are only 32-bits long, or roughly 4 billion combinations. A fast Internet link can easily handle 1 million packets-per-second, so the entire Internet can be scanned in 4000 seconds, little more than an hour. It’s basic math.
Because it’s so easy, many people do it. If you monitor your Internet link, you’ll see a steady trickle of packets coming in from all over the Internet, especially Russia and China, from hackers scanning the Internet for things they can hack.
People’s reaction to this scanning is weirdly emotional, taking is personally, such as:
Why are they hacking me? What did I do to them?
Great! They are hacking me! That must mean I’m important!
Grrr! How dare they?! How can I hack them back for some retribution!?
I find this odd, because obviously such scanning isn’t personal, the hackers have no idea who you are.
If you connect to the Starbucks WiFi, a hacker nearby can easily eavesdrop on your network traffic, because it’s not encrypted. Windows even warns you about this, in case you weren’t sure.
At DefCon, they have a “Wall of Sheep”, where they show passwords from people who logged onto stuff using the insecure “DefCon-Open” network. Calling them “sheep” for not grasping this basic fact that unencrypted traffic is unencrypted.
To be fair, it’s actually non-obvious to many people. Even if the WiFi itself is not encrypted, SSL traffic is. They expect their services to be encrypted, without them having to worry about it. And in fact, most are, especially Google, Facebook, Twitter, Apple, and other major services that won’t allow you to log in anymore without encryption.
But many services (especially old ones) may not be encrypted. Unless users check and verify them carefully, they’ll happily expose passwords.
What’s interesting about this was 10 years ago, when most services which only used SSL to encrypt the passwords, but then used unencrypted connections after that, using “cookies”. This allowed the cookies to be sniffed and stolen, allowing other people to share the login session. I used this on stage at BlackHat to connect to somebody’s GMail session. Google, and other major websites, fixed this soon after. But it should never have been a problem — because the sidejacking of cookies should have been obvious.
Tools: Wireshark, dsniff
Stuxnet LNK vulnerability
Again, this issue isn’t obvious to the public, but it should’ve been obvious to anybody who knew how Windows works.
When Windows loads a .dll, it first calls the function DllMain(). A Windows link file (.lnk) can load icons/graphics from the resources in a .dll file. It does this by loading the .dll file, thus calling DllMain. Thus, a hacker could put on a USB drive a .lnk file pointing to a .dll file, and thus, cause arbitrary code execution as soon as a user inserted a drive.
I say this is obvious because I did this, created .lnks that pointed to .dlls, but without hostile DllMain code. The consequence should’ve been obvious to me, but I totally missed the connection. We all missed the connection, for decades.
Social Engineering and Tech Support [* * *]
After posting this, many people have pointed out “social engineering”, especially of “tech support”. This probably should be up near #1 in terms of obviousness.
The classic example of social engineering is when you call tech support and tell them you’ve lost your password, and they reset it for you with minimum of questions proving who you are. For example, you set the volume on your computer really loud and play the sound of a crying baby in the background and appear to be a bit frazzled and incoherent, which explains why you aren’t answering the questions they are asking. They, understanding your predicament as a new parent, will go the extra mile in helping you, resetting “your” password.
One of the interesting consequences is how it affects domain names (DNS). It’s quite easy in many cases to call up the registrar and convince them to transfer a domain name. This has been used in lots of hacks. It’s really hard to defend against. If a registrar charges only $9/year for a domain name, then it really can’t afford to provide very good tech support — or very secure tech support — to prevent this sort of hack.
Social engineering is such a huge problem, and obvious problem, that it’s outside the scope of this document. Just google it to find example after example.
A related issue that perhaps deserves it’s own section is OSINT [*], or “open-source intelligence”, where you gather public information about a target. For example, on the day the bank manager is out on vacation (which you got from their Facebook post) you show up and claim to be a bank auditor, and are shown into their office where you grab their backup tapes. (We’ve actually done this).
Telephones historically used what we call “in-band signaling”. That’s why when you dial on an old phone, it makes sounds — those sounds are sent no differently than the way your voice is sent. Thus, it was possible to make tone generators to do things other than simply dial calls. Early hackers (in the 1970s) would make tone-generators called “blue-boxes” and “black-boxes” to make free long distance calls, for example.
These days, “signaling” and “voice” are digitized, then sent as separate channels or “bands”. This is call “out-of-band signaling”. You can’t trick the phone system by generating tones. When your iPhone makes sounds when you dial, it’s entirely for you benefit and has nothing to do with how it signals the cell tower to make a call.
Early hackers, like the founders of Apple, are famous for having started their careers making such “boxes” for tricking the phone system. The problem was obvious back in the day, which is why as the phone system moves from analog to digital, the problem was fixed.
A simple trick is to put a virus on a USB flash drive, and drop it in a parking lot. Somebody is bound to notice it, stick it in their computer, and open the file.
This can be extended with tricks. For example, you can put a file labeled “third-quarter-salaries.xlsx” on the drive that required macros to be run in order to open. It’s irresistible to other employees who want to know what their peers are being paid, so they’ll bypass any warning prompts in order to see the data.
Another example is to go online and get custom USB sticks made printed with the logo of the target company, making them seem more trustworthy.
We also did a trick of taking an Adobe Flash game “Punch the Monkey” and replaced the monkey with a logo of a competitor of our target. They now only played the game (infecting themselves with our virus), but gave to others inside the company to play, infecting others, including the CEO.
Search engines like Google will index your website — your entire website. Frequently companies put things on their website without much protection because they are nearly impossible for users to find. But Google finds them, then indexes them, causing them to pop up with innocent searches.
There are books written on “Google hacking” explaining what search terms to look for, like “not for public release”, in order to find such documents.
At the top of every browser is what’s called the “URL”. You can change it. Thus, if you see a URL that looks like this:
http://www.example.com/documents?id=138493
Then you can edit it to see the next document on the server:
http://www.example.com/documents?id=138494
The owner of the website may think they are secure, because nothing points to this document, so the Google search won’t find it. But that doesn’t stop a user from manually editing the URL.
An example of this is a big Fortune 500 company that posts the quarterly results to the website an hour before the official announcement. Simply editing the URL from previous financial announcements allows hackers to find the document, then buy/sell the stock as appropriate in order to make a lot of money.
Another example is the classic case of Andrew “Weev” Auernheimer who did this trick in order to download the account email addresses of early owners of the iPad, including movie stars and members of the Obama administration. It’s an interesting legal case because on one hand, techies consider this so obvious as to not be “hacking”. On the other hand, non-techies, especially judges and prosecutors, believe this to be obviously “hacking”.
For decades now, online gamers have figured out an easy way to win: just flood the opponent with Internet traffic, slowing their network connection. This is called a DoS, which stands for “Denial of Service”. DoSing game competitors is often a teenager’s first foray into hacking.
A variant of this is when you hack a bunch of other machines on the Internet, then command them to flood your target. (The hacked machines are often called a “botnet”, a network of robot computers). This is called DDoS, or “Distributed DoS”. At this point, it gets quite serious, as instead of competitive gamers hackers can take down entire businesses. Extortion scams, DDoSing websites then demanding payment to stop, is a common way hackers earn money.
Another form of DDoS is “amplification”. Sometimes when you send a packet to a machine on the Internet it’ll respond with a much larger response, either a very large packet or many packets. The hacker can then send a packet to many of these sites, “spoofing” or forging the IP address of the victim. This causes all those sites to then flood the victim with traffic. Thus, with a small amount of outbound traffic, the hacker can flood the inbound traffic of the victim.
This is one of those things that has worked for 20 years, because it’s so obvious teenagers can do it, yet there is no obvious solution. President Trump’s executive order of cyberspace specifically demanded that his government come up with a report on how to address this, but it’s unlikely that they’ll come up with any useful strategy.
In the world of data science, users must often sacrifice cluster set-up time to allow for complex usability scenarios. Amazon EMR allows data scientists to spin up complex cluster configurations easily, and to be up and running with complex queries in a matter of minutes.
Data scientists often use scheduling applications such as Oozie to run jobs overnight. However, Oozie can be difficult to configure when you are trying to use popular Python packages (such as “pandas,” “numpy,” and “statsmodels”), which are not included by default.
One such popular platform that contains these types of packages (and more) is Anaconda. This post focuses on setting up an Anaconda platform on EMR, with an intent to use its packages with Oozie. I describe how to run jobs using a popular open source scheduler like Oozie.
Walkthrough
For this post, you walk through the following tasks:
Create an EMR cluster.
Download Anaconda on your master node.
Configure Oozie.
Test the steps.
Create an EMR cluster
Spin up an Amazon EMR cluster using the console or the AWS CLI. Use the latest release, and include Apache Hadoop, Apache Spark, Apache Hive, and Oozie.
To create a three-node cluster in the us-east-1 region, issue an AWS CLI command such as the following. This command must be typed as one line, as shown below. It is shown here separated for readability purposes only.
At the time of publication, Anaconda 4.4 is the most current version available. For the download link location for the latest Python 2.7 version (Python 3.6 may encounter issues), see https://www.continuum.io/downloads. Open the context (right-click) menu for the Python 2.7 download link, choose Copy Link Location, and use this value in the previous wget command.
This post used the Anaconda 4.4 installation. If you have a later version, it is shown in the downloaded filename: “anaconda2-<version number>-Linux-x86_64.sh”.
Run this downloaded script and follow the on-screen installer prompts.
For an installation directory, select somewhere with enough space on your cluster, such as “/mnt/anaconda/”.
The process should take approximately 1–2 minutes to install. When prompted if you “wish the installer to prepend the Anaconda2 install location”, select the default option of [no].
After you are done, export the PATH to include this new Anaconda installation:
export PATH=/mnt/anaconda/bin:$PATH
Zip up the Anaconda installation:
cd /mnt/anaconda/
zip -r anaconda.zip .
The zip process may take 4–5 minutes to complete.
(Optional) Upload this anaconda.zip file to your S3 bucket for easier inclusion into future EMR clusters. This removes the need to repeat the previous steps for future EMR clusters.
Configure Oozie
Next, you configure Oozie to use Pyspark and the Anaconda platform.
Get the location of your Oozie sharelibupdate folder. Issue the following command and take note of the “sharelibDirNew” value:
oozie admin -sharelibupdate
For this post, this value is “hdfs://ip-192-168-4-200.us-east-1.compute.internal:8020/user/oozie/share/lib/lib_20170616133136”.
Pass in the required Pyspark files into Oozies sharelibupdate location. The following files are required for Oozie to be able to run Pyspark commands:
pyspark.zip
py4j-0.10.4-src.zip
These are located on the EMR master instance in the location “/usr/lib/spark/python/lib/”, and must be put into the Oozie sharelib spark directory. This location is the value of the sharelibDirNew parameter value (shown above) with “/spark/” appended, that is, “hdfs://ip-192-168-4-200.us-east-1.compute.internal:8020/user/oozie/share/lib/lib_20170616133136/spark/”.
(Optional) Verify that it was transferred successfully with the following command:
hdfs dfs -ls /tmp/myLocation/
On your master node, execute the following command:
export PYSPARK_PYTHON=/mnt/anaconda/bin/python
Set the PYSPARK_PYTHON environment variable on the executor nodes. Put the following configurations in your “spark-opts” values in your Oozie workflow.xml file:
Put the “myPysparkProgram.py” into the location mentioned between the “<jar>xxxxx</jar>” tags in your workflow.xml. In this example, the location is “hdfs:///user/oozie/apps/”. Use the following command to move the “myPysparkProgram.py” file to the correct location:
This is a sentence. 12
So is this 12
This is also a sentence 12
Summary
The myPysparkProgram.py has successfully imported the numpy library from the Anaconda platform and has produced some output with it. If you tried to run this using standard Python, you’d encounter the following error:
Now when your Python job runs in Oozie, any imported packages that are implicitly imported by your Pyspark script are imported into your job within Oozie directly from the Anaconda platform. Simple!
If you have questions or suggestions, please leave a comment below.
John Ohle is an AWS BigData Cloud Support Engineer II for the BigData team in Dublin. He works to provide advice and solutions to our customers on their Big Data projects and workflows on AWS. In his spare time, he likes to play music, learn, develop tools and write documentation to further help others – both colleagues and customers alike.
In this blog post, I demonstrate the step-by-step process for end-to-end account creation in Organizations as well as how to automate the entire process. I also show how to move a new account into an organizational unit (OU).
Process overview
The following process flow diagram illustrates the steps required to create an account, configure the account, and then move it into an OU so that the account can take advantage of the centralized SCP functionality in Organizations. The tasks in the blue nodes occur in the master account in the organization in question, and the task in the orange node occurs in the new member account I create. In this post, I provide a script (in both Bash/CLI and Python) that you can use to automate this account creation process, and I walk through each step shown in the diagram to explain the process in detail. For the purposes of this post, I use the AWS CLI in combination with CloudFormation to create and configure an account.
The account creation process
Follow the steps in this section to create an account, configure it, and move it into an OU. I am also providing a script and CloudFormation templates that you can use to automate the entire process.
1. Sign in to AWS Organizations
In order to create an account, you must sign in to your organization’s master account with a minimum of the following permissions:
organizations:DescribeOrganization
organizations:CreateAccount
2. Create a new member account
After signing in to your organization’s master account, create a new member account. Before you can create the member account, you need three pieces of information:
An account name – The friendly name of the member account, which you can find on the Accounts tab in the master account.
An email address – The email address of the owner of the new member account. This email address is used by AWS when we need to contact the account owner.
An IAM role name – The name of an IAM role that Organizations automatically preconfigures in the new member account. This role trusts the master account, allowing users in the master account to assume the role, as permitted by the master account administrator. The role also has administrator permissions in the new member account. If you do not change the role’s name, the name defaults to OrganizationAccountAccessRole.
The following AWS CLI command creates a new member account.
To explain the placeholder values in the preceding command that you must update with your own values:
newAccEmail – The email address of the owner of the new member account. This email address must not already be associated with another AWS account.
newAccName – The friendly name of the new member account.
roleName – The name of an IAM role that Organizations automatically preconfigures in the new member account. The default name is OrganizationAccountAccessRole.
This CLI command returns a request_id that uniquely identifies the request, a value that is required for in Step 3.
Important: When you create an account using Organizations, you currently cannot remove this account from your organization. This, in turn, can prevent you from later deleting the organization.
3. Verify account creation
Account creation may take a few seconds to complete, so before doing anything with the newly created account, you must first verify the account creation status. To check the status, you must have at least the following permission:
organizations:DescribeCreateAccountStatus
The following CLI command, with the request_id returned in the previous step as an input parameter, verifies that the account was created:
The command returns the state of your account creation request and can have three different values: IN_PROGRESS, SUCCEEDED, and FAILED.
4. Assume a role
After you have verified that the new account has been created, configure the account. In order to configure the newly created account, you must sign in with a user who has permission to assume the role submitted in the createAccount API call. In the example in Step 1, I named the role OrganizationAccountAccessRole; however, if you revised the name of the role, you must use that revised name when assuming the role. Note that when an account is created from within an organization, cross-account trust between the master and programmatically created accounts is automatically established.
role-session-name – An identifier for the assumed role session.
5. Configure the new account
After you assume the role, build the new account’s networking, IAM, and governance resources as explained in this section. Again, to learn more about and download the account creation script and the templates that can automate this process, see “Automating the entire end-to-end process” later in this post.
Networking – Amazon VPC, web access control lists (ACLs), and Internet gateway:
Create a new Amazon VPC to enable you to launch AWS resources in a virtual network that you define.
Run the script at the end of this post to create a VPC with two subnets (one public subnet and one private subnet) in each of two Availability Zones.
Set up web ACLs to control traffic in and out of the subnets. You can set up network ACLs with rules similar to your security groups in order to add an additional layer of security to your VPC.
Connect your VPC to remote networks by using a VPN connection.
If the resources in the VPC require access to the Internet, create an Internet gateway to allow communication between instances in your VPC and the Internet.
IAM – Identity provider (IdP), IAM policies, and IAM roles:
Many customers use enterprise federated authentication (such as Active Directory) to manage users and permissions centrally outside AWS. If you use federated authentication, set up an IdP.
Use AWS managed policies or your customer managed policies to manage access to your AWS resources.
Governance – AWS Config Rules:
Create AWS Config rules to help manage and enforce standards for resources deployed on AWS.
Develop a tagging strategy that specifies a minimum set of tags required on every taggable resource. A tagging rule checks that all resources created or edited fulfill this requirement. A noncompliance report is created to document resources that do not meet the AWS Config rule. AWS Lambda scripts can also be launched as a result of AWS Config rules.
6. Move the new account into an OU
Before allowing your development teams to access the new member account that you configured in the previous steps, apply an SCP to the account to limit the API calls that can be made by all users. To do this, you must move the member account into an OU that has an SCP attached to it.
An OU is a container for accounts. It can contain other OUs, allowing you to create a hierarchy that resembles an upside-down tree with a “root” at the top and OU “branches” that reach down, ending with accounts that are the “leaves” of the tree. When you attach a policy to one of the nodes in the hierarchy, it affects all the branches (OUs) and leaves (accounts) under it. An OU can have exactly one parent, and currently, each account can be a member of exactly one OU.
The following CLI command moves an account into an OU.
To explain the placeholder values in the preceding command that you must update with your own values:
account_id – The unique identifier (ID) of the account you want to move.
source_parent_id – The unique ID of the root or OU from which you want to move the account.
destination_parent_id – The unique ID of the root or OU to which you want to move the account.
7. Reduce the IAM role permissions
The OrganizationAccountAccessRole is created with full administrative permissions to enable the creation and development of the new member account. After you complete the development process and you have moved the member account into an OU, reduce the permissions of OrganizationAccountAccessRole to match your anticipated use of this role going forward.
Automating the entire end-to-end process
To help you fully automate the process of creating new member accounts, setting up those accounts, and moving new member accounts into an OU, I am providing a script in both Bash/CLI and Python. You can modify or call additional CloudFormation templates as needed.
Download the script and CloudFormation templates
Download the script and CloudFormation templates to help you automate this end-to-end process. The global variables in the script are set in the opening lines of code. Update these variables’ values, and they will flow as input parameters to the API commands when the script is executed. I have prepopulated the roleName by using AWS best practices nomenclature, but you can use a custom name.
I am including the following descriptions of the elements of the script to give you a better idea of how the script works.
Bash/CLI:
Organization-new-acc.sh – An example shell script that includes parameters, account creation, and a call to the JSON sample templates for each of three subtasks in Step 5 earlier in this post.
CF-VPC.json – An example Cloud Formation template that creates and configures a VPC in the new member account. Each AWS account must have at least one VPC as a networking construct where you can deploy customer resources. Though AWS does create a default VPC when an account is created, you must configure that VPC to meet your needs. This includes creating subnets with specific IP Classless Inter-Domain Routing (CIDR) blocks, creating gateways (including an Internet gateway, a customer gateway, a VPN tunnel, AWS Storage Gateway, Amazon API Gateway, and a NAT gateway), and VPC peering connections. Web ACLs are also part of this process to limit the source IP addresses and ports that can access the VPC. The VPC created by this script includes four subnets across two Availability Zones. Two of the subnets are public and two are private.
CF-IAM.json – An example Cloud Formation template that creates IAM roles in the new member account. As part of a security baseline, you should develop a standard set of IAM roles and related policies. Update this template with the IAM role definitions and policies you want to create in the member account to controls privilege and access.
CF-ConfigRules.json – An example Cloud Formation template that creates an AWS Config rule to enforce tagging standards on resources created in the new account.
Organization_Output.docx – Example output of the results from running Organization-new-acc.sh.
Python:
Create_account_with_iam.py – An example Python template that creates an account, moves it into an OU, applies an SCP, and then calls additional templates to deploy resources. CF-VPC.JSON can be called by this template if you first customize the .json file.
Baseline.yml – An example CloudFormation template for creating a new IAM administrative user, IAM user group, IAM role, and IAM policy in the account.
Summary
In this post, I have demonstrated the step-by-step process for end-to-end account creation in Organizations as well as how to automate the entire process. I also showed how to move a new account into an OU. This solution should save you some time and help you avoid common issues that tend to crop up in the manual account-creation process. To learn more about the features of Organizations, see the AWS Organizations User Guide. For more information about the APIs used in this post, see the Organizations API Reference.
If you have comments about this blog post, submit them in the “Comments” section below. If you have implementation or troubleshooting questions, start a new thread on the Organizations forum.
One of the great benefits of Amazon S3 is the ability to host, share, or consume public data sets. This provides transparency into data to which an external data scientist or developer might not normally have access. By exposing the data to the public, you can glean many insights that would have been difficult with a data silo.
The openFDA project creates easy access to the high value, high priority, and public access data of the Food and Drug Administration (FDA). The data has been formatted and documented in consumer-friendly standards. Critical data related to drugs, devices, and food has been harmonized and can easily be called by application developers and researchers via API calls. OpenFDA has published two whitepapers that drill into the technical underpinnings of the API infrastructure as well as how to properly analyze the data in R. In addition, FDA makes openFDA data available on S3 in raw format.
In this post, I show how to use S3, Amazon EMR, and Amazon Athena to analyze the drug adverse events dataset. A drug adverse event is an undesirable experience associated with the use of a drug, including serious drug side effects, product use errors, product quality programs, and therapeutic failures.
Data considerations
Keep in mind that this data does have limitations. In addition, in the United States, these adverse events are submitted to the FDA voluntarily from consumers so there may not be reports for all events that occurred. There is no certainty that the reported event was actually due to the product. The FDA does not require that a causal relationship between a product and event be proven, and reports do not always contain the detail necessary to evaluate an event. Because of this, there is no way to identify the true number of events. The important takeaway to all this is that the information contained in this data has not been verified to produce cause and effect relationships. Despite this disclaimer, many interesting insights and value can be derived from the data to accelerate drug safety research.
Data analysis using SQL
For application developers who want to perform targeted searching and lookups, the API endpoints provided by the openFDA project are “ready to go” for software integration using a standard API powered by Elasticsearch, NodeJS, and Docker. However, for data analysis purposes, it is often easier to work with the data using SQL and statistical packages that expect a SQL table structure. For large-scale analysis, APIs often have query limits, such as 5000 records per query. This can cause extra work for data scientists who want to analyze the full dataset instead of small subsets of data.
To address the concern of requiring all the data in a single dataset, the openFDA project released the full 100 GB of harmonized data files that back the openFDA project onto S3. Athena is an interactive query service that makes it easy to analyze data in S3 using standard SQL. It’s a quick and easy way to answer your questions about adverse events and aspirin that does not require you to spin up databases or servers.
While you could point tools directly at the openFDA S3 files, you can find greatly improved performance and use of the data by following some of the preparation steps later in this post.
Architecture
This post explains how to use the following architecture to take the raw data provided by openFDA, leverage several AWS services, and derive meaning from the underlying data.
Steps:
Load the openFDA /drug/event dataset into Spark and convert it to gzip to allow for streaming.
Transform the data in Spark and save the results as a Parquet file in S3.
Query the S3 Parquet file with Athena.
Perform visualization and analysis of the data in R and Python on Amazon EC2.
Optimizing public data sets: A primer on data preparation
Those who want to jump right into preparing the files for Athena may want to skip ahead to the next section.
Transforming, or pre-processing, files is a common task for using many public data sets. Before you jump into the specific steps for transforming the openFDA data files into a format optimized for Athena, I thought it would be worthwhile to provide a quick exploration on the problem.
Making a dataset in S3 efficiently accessible with minimal transformation for the end user has two key elements:
Partitioning the data into objects that contain a complete part of the data (such as data created within a specific month).
Using file formats that make it easy for applications to locate subsets of data (for example, gzip, Parquet, ORC, etc.).
With these two key elements in mind, you can now apply transformations to the openFDA adverse event data to prepare it for Athena. You might find the data techniques employed in this post to be applicable to many of the questions you might want to ask of the public data sets stored in Amazon S3.
Before you get started, I encourage those who are interested in doing deeper healthcare analysis on AWS to make sure that you first read the AWS HIPAA Compliance whitepaper. This covers the information necessary for processing and storing patient health information (PHI).
Also, the adverse event analysis shown for aspirin is strictly for demonstration purposes and should not be used for any real decision or taken as anything other than a demonstration of AWS capabilities. However, there have been robust case studies published that have explored a causal relationship between aspirin and adverse reactions using OpenFDA data. If you are seeking research on aspirin or its risks, visit organizations such as the Centers for Disease Control and Prevention (CDC) or the Institute of Medicine (IOM).
Preparing data for Athena
For this walkthrough, you will start with the FDA adverse events dataset, which is stored as JSON files within zip archives on S3. You then convert it to Parquet for analysis. Why do you need to convert it? The original data download is stored in objects that are partitioned by quarter.
Here is a small sample of what you find in the adverse events (/drugs/event) section of the openFDA website.
If you were looking for events that happened in a specific quarter, this is not a bad solution. For most other scenarios, such as looking across the full history of aspirin events, it requires you to access a lot of data that you won’t need. The zip file format is not ideal for using data in place because zip readers must have random access to the file, which means the data can’t be streamed. Additionally, the zip files contain large JSON objects.
To read the data in these JSON files, a streaming JSON decoder must be used or a computer with a significant amount of RAM must decode the JSON. Opening up these files for public consumption is a great start. However, you still prepare the data with a few lines of Spark code so that the JSON can be streamed.
Step 1: Convert the file types
Using Apache Spark on EMR, you can extract all of the zip files and pull out the events from the JSON files. To do this, use the Scala code below to deflate the zip file and create a text file. In addition, compress the JSON files with gzip to improve Spark’s performance and reduce your overall storage footprint. The Scala code can be run in either the Spark Shell or in an Apache Zeppelin notebook on your EMR cluster.
If you are unfamiliar with either Apache Zeppelin or the Spark Shell, the following posts serve as great references:
import scala.io.Source
import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import org.apache.hadoop.io.compress.GzipCodec
// Input Directory
val inputFile = "s3://download.open.fda.gov/drug/event/2015q4/*.json.zip";
// Output Directory
val outputDir = "s3://{YOUR OUTPUT BUCKET HERE}/output/2015q4/";
// Extract zip files from
val zipFiles = sc.binaryFiles(inputFile);
// Process zip file to extract the json as text file and save it
// in the output directory
val rdd = zipFiles.flatMap((file: (String, PortableDataStream)) => {
val zipStream = new ZipInputStream(file.2.open)
val entry = zipStream.getNextEntry
val iter = Source.fromInputStream(zipStream).getLines
iter
}).map(.replaceAll("\s+","")).saveAsTextFile(outputDir, classOf[GzipCodec])
Step 2: Transform JSON into Parquet
With just a few more lines of Scala code, you can use Spark’s abstractions to convert the JSON into a Spark DataFrame and then export the data back to S3 in Parquet format.
Spark requires the JSON to be in JSON Lines format to be parsed correctly into a DataFrame.
// Output Parquet directory
val outputDir = "s3://{YOUR OUTPUT BUCKET NAME}/output/drugevents"
// Input json file
val inputJson = "s3://{YOUR OUTPUT BUCKET NAME}/output/2015q4/*”
// Load dataframe from json file multiline
val df = spark.read.json(sc.wholeTextFiles(inputJson).values)
// Extract results from dataframe
val results = df.select("results")
// Save it to Parquet
results.write.parquet(outputDir)
Step 3: Create an Athena table
With the data cleanly prepared and stored in S3 using the Parquet format, you can now place an Athena table on top of it to get a better understanding of the underlying data.
Because the openFDA data structure incorporates several layers of nesting, it can be a complex process to try to manually derive the underlying schema in a Hive-compatible format. To shorten this process, you can load the top row of the DataFrame from the previous step into a Hive table within Zeppelin and then extract the “create table” statement from SparkSQL.
results.createOrReplaceTempView("data")
val top1 = spark.sql("select * from data tablesample(1 rows)")
top1.write.format("parquet").mode("overwrite").saveAsTable("drugevents")
val show_cmd = spark.sql("show create table drugevents”).show(1, false)
This returns a “create table” statement that you can almost paste directly into the Athena console. Make some small modifications (adding the word “external” and replacing “using with “stored as”), and then execute the code in the Athena query editor. The table is created.
For the openFDA data, the DDL returns all string fields, as the date format used in your dataset does not conform to the yyy-mm-dd hh:mm:ss[.f…] format required by Hive. For your analysis, the string format works appropriately but it would be possible to extend this code to use a Presto function to convert the strings into time stamps.
With the Athena table in place, you can start to explore the data by running ad hoc queries within Athena or doing more advanced statistical analysis in R.
Using SQL and R to analyze adverse events
Using the openFDA data with Athena makes it very easy to translate your questions into SQL code and perform quick analysis on the data. After you have prepared the data for Athena, you can begin to explore the relationship between aspirin and adverse drug events, as an example. One of the most common metrics to measure adverse drug events is the Proportional Reporting Ratio (PRR). It is defined as:
PRR = (m/n)/( (M-m)/(N-n) ) Where m = #reports with drug and event n = #reports with drug M = #reports with event in database N = #reports in database
Gastrointestinal haemorrhage has the highest PRR of any reaction to aspirin when viewed in aggregate. One question you may want to ask is how the PRR has trended on a yearly basis for gastrointestinal haemorrhage since 2005.
Using the following query in Athena, you can see the PRR trend of “GASTROINTESTINAL HAEMORRHAGE” reactions with “ASPIRIN” since 2005:
with drug_and_event as
(select rpad(receiptdate, 4, 'NA') as receipt_year
, reactionmeddrapt
, count(distinct (concat(safetyreportid,receiptdate,reactionmeddrapt))) as reports_with_drug_and_event
from fda.drugevents
where rpad(receiptdate,4,'NA')
between '2005' and '2015'
and medicinalproduct = 'ASPIRIN'
and reactionmeddrapt= 'GASTROINTESTINAL HAEMORRHAGE'
group by reactionmeddrapt, rpad(receiptdate, 4, 'NA')
), reports_with_drug as
(
select rpad(receiptdate, 4, 'NA') as receipt_year
, count(distinct (concat(safetyreportid,receiptdate,reactionmeddrapt))) as reports_with_drug
from fda.drugevents
where rpad(receiptdate,4,'NA')
between '2005' and '2015'
and medicinalproduct = 'ASPIRIN'
group by rpad(receiptdate, 4, 'NA')
), reports_with_event as
(
select rpad(receiptdate, 4, 'NA') as receipt_year
, count(distinct (concat(safetyreportid,receiptdate,reactionmeddrapt))) as reports_with_event
from fda.drugevents
where rpad(receiptdate,4,'NA')
between '2005' and '2015'
and reactionmeddrapt= 'GASTROINTESTINAL HAEMORRHAGE'
group by rpad(receiptdate, 4, 'NA')
), total_reports as
(
select rpad(receiptdate, 4, 'NA') as receipt_year
, count(distinct (concat(safetyreportid,receiptdate,reactionmeddrapt))) as total_reports
from fda.drugevents
where rpad(receiptdate,4,'NA')
between '2005' and '2015'
group by rpad(receiptdate, 4, 'NA')
)
select drug_and_event.receipt_year,
(1.0 * drug_and_event.reports_with_drug_and_event/reports_with_drug.reports_with_drug)/ (1.0 * (reports_with_event.reports_with_event- drug_and_event.reports_with_drug_and_event)/(total_reports.total_reports-reports_with_drug.reports_with_drug)) as prr
, drug_and_event.reports_with_drug_and_event
, reports_with_drug.reports_with_drug
, reports_with_event.reports_with_event
, total_reports.total_reports
from drug_and_event
inner join reports_with_drug on drug_and_event.receipt_year = reports_with_drug.receipt_year
inner join reports_with_event on drug_and_event.receipt_year = reports_with_event.receipt_year
inner join total_reports on drug_and_event.receipt_year = total_reports.receipt_year
order by drug_and_event.receipt_year
One nice feature of Athena is that you can quickly connect to it via R or any other tool that can use a JDBC driver to visualize the data and understand it more clearly.
With this quick R script that can be run in R Studio either locally or on an EC2 instance, you can create a visualization of the PRR and Reporting Odds Ratio (RoR) for “GASTROINTESTINAL HAEMORRHAGE” reactions from “ASPIRIN” since 2005 to better understand these trends.
# connect to ATHENA
conn <- dbConnect(drv, '<Your JDBC URL>',s3_staging_dir="<Your S3 Location>",user=Sys.getenv(c("USER_NAME"),password=Sys.getenv(c("USER_PASSWORD"))
# Declare Adverse Event
adverseEvent <- "'GASTROINTESTINAL HAEMORRHAGE'"
# Build SQL Blocks
sqlFirst <- "SELECT rpad(receiptdate, 4, 'NA') as receipt_year, count(DISTINCT safetyreportid) as event_count FROM fda.drugsflat WHERE rpad(receiptdate,4,'NA') between '2005' and '2015'"
sqlEnd <- "GROUP BY rpad(receiptdate, 4, 'NA') ORDER BY receipt_year"
# Extract Aspirin with adverse event counts
sql <- paste(sqlFirst,"AND medicinalproduct ='ASPIRIN' AND reactionmeddrapt=",adverseEvent, sqlEnd,sep=" ")
aspirinAdverseCount = dbGetQuery(conn,sql)
# Extract Aspirin counts
sql <- paste(sqlFirst,"AND medicinalproduct ='ASPIRIN'", sqlEnd,sep=" ")
aspirinCount = dbGetQuery(conn,sql)
# Extract adverse event counts
sql <- paste(sqlFirst,"AND reactionmeddrapt=",adverseEvent, sqlEnd,sep=" ")
adverseCount = dbGetQuery(conn,sql)
# All Drug Adverse event Counts
sql <- paste(sqlFirst, sqlEnd,sep=" ")
allDrugCount = dbGetQuery(conn,sql)
# Select correct rows
selAll = allDrugCount$receipt_year == aspirinAdverseCount$receipt_year
selAspirin = aspirinCount$receipt_year == aspirinAdverseCount$receipt_year
selAdverse = adverseCount$receipt_year == aspirinAdverseCount$receipt_year
# Calculate Numbers
m <- c(aspirinAdverseCount$event_count)
n <- c(aspirinCount[selAspirin,2])
M <- c(adverseCount[selAdverse,2])
N <- c(allDrugCount[selAll,2])
# Calculate proptional reporting ratio
PRR = (m/n)/((M-m)/(N-n))
# Calculate reporting Odds Ratio
d = n-m
D = N-M
ROR = (m/d)/(M/D)
# Plot the PRR and ROR
g_range <- range(0, PRR,ROR)
g_range[2] <- g_range[2] + 3
yearLen = length(aspirinAdverseCount$receipt_year)
axis(1,1:yearLen,lab=ax)
plot(PRR, type="o", col="blue", ylim=g_range,axes=FALSE, ann=FALSE)
axis(1,1:yearLen,lab=ax)
axis(2, las=1, at=1*0:g_range[2])
box()
lines(ROR, type="o", pch=22, lty=2, col="red")
As you can see, the PRR and RoR have both remained fairly steady over this time range. With the R Script above, all you need to do is change the adverseEvent variable from GASTROINTESTINAL HAEMORRHAGE to another type of reaction to analyze and compare those trends.
Summary
In this walkthrough:
You used a Scala script on EMR to convert the openFDA zip files to gzip.
You then transformed the JSON blobs into flattened Parquet files using Spark on EMR.
You created an Athena DDL so that you could query these Parquet files residing in S3.
Finally, you pointed the R package at the Athena table to analyze the data without pulling it into a database or creating your own servers.
If you have questions or suggestions, please comment below.
Ryan Hood is a Data Engineer for AWS. He works on big data projects leveraging the newest AWS offerings. In his spare time, he enjoys watching the Cubs win the World Series and attempting to Sous-vide anything he can find in his refrigerator.
Vikram Anand is a Data Engineer for AWS. He works on big data projects leveraging the newest AWS offerings. In his spare time, he enjoys playing soccer and watching the NFL & European Soccer leagues.
Dave Rocamora is a Solutions Architect at Amazon Web Services on the Open Data team. Dave is based in Seattle and when he is not opening data, he enjoys biking and drinking coffee outside.
It is likely that you are developing some form of (web/RESTful) API, and in case it is publicly-facing (or even when it’s internal), you normally want to rate-limit it somehow. That is, to limit the number of requests performed over a period of time, in order to save resources and protect from abuse.
This can probably be achieved on web-server/load balancer level with some clever configurations, but usually you want the rate limiter to be client-specific (i.e. each client of your API sohuld have a separate rate limit), and the way the client is identified varies. It’s probably still possible to do it on the load balancer, but I think it makes sense to have it on the application level.
I’ll use spring-mvc for the example, but any web framework has a good way to plug an interceptor.
So here’s an example of a spring-mvc interceptor:
@Component
public class RateLimitingInterceptor extends HandlerInterceptorAdapter {
private static final Logger logger = LoggerFactory.getLogger(RateLimitingInterceptor.class);
@Value("${rate.limit.enabled}")
private boolean enabled;
@Value("${rate.limit.hourly.limit}")
private int hourlyLimit;
private Map<String, Optional<SimpleRateLimiter>> limiters = new ConcurrentHashMap<>();
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
if (!enabled) {
return true;
}
String clientId = request.getHeader("Client-Id");
// let non-API requests pass
if (clientId == null) {
return true;
}
SimpleRateLimiter rateLimiter = getRateLimiter(clientId);
boolean allowRequest = limiter.tryAcquire();
if (!allowRequest) {
response.setStatus(HttpStatus.TOO_MANY_REQUESTS.value());
}
response.addHeader("X-RateLimit-Limit", String.valueOf(hourlyLimit));
return allowRequest;
}
private SimpleRateLimiter getRateLimiter(String clientId) {
return limiters.computeIfAbsent(clientId, clientId -> {
return Optional.of(createRateLimiter(clientId));
});
}
@PreDestroy
public void destroy() {
// loop and finalize all limiters
}
}
This initializes rate-limiters per client on demand. Alternatively, on startup you could just loop through all registered API clients and create a rate limiter for each. In case the rate limiter doesn’t allow more requests (tryAcquire() returns false), then raturn “Too many requests” and abort the execution of the request (return “false” from the interceptor).
This sounds simple. But there are a few catches. You may wonder where the SimpleRateLimiter above is defined. We’ll get there, but first let’s see what options do we have for rate limiter implementations.
The most recommended one seems to be the guava RateLimiter. It has a straightforward factory method that gives you a rate limiter for a specified rate (permits per second). However, it doesn’t accomodate web APIs very well, as you can’t initilize the RateLimiter with pre-existing number of permits. That means a period of time should elapse before the limiter would allow requests. There’s another issue – if you have less than one permits per second (e.g. if your desired rate limit is “200 requests per hour”), you can pass a fraction (hourlyLimit / secondsInHour), but it still won’t work the way you expect it to, as internally there’s a “maxPermits” field that would cap the number of permits to much less than you want it to. Also, the rate limiter doesn’t allow bursts – you have exactly X permits per second, but you cannot spread them over a long period of time, e.g. have 5 requests in one second, and then no requests for the next few seconds. In fact, all of the above can be solved, but sadly, through hidden fields that you don’t have access to. Multiple feature requests exist for years now, but Guava just doesn’t update the rate limiter, making it much less applicable to API rate-limiting.
Using reflection, you can tweak the parameters and make the limiter work. However, it’s ugly, and it’s not guaranteed it will work as expected. I have shown here how to initialize a guava rate limiter with X permits per hour, with burstability and full initial permits. When I thought that would do, I saw that tryAcquire() has a synchronized(..) block. Will that mean all requests will wait for each other when simply checking whether allowed to make a request? That would be horrible.
So in fact the guava RateLimiter is not meant for (web) API rate-limiting. Maybe keeping it feature-poor is Guava’s way for discouraging people from misusing it?
public class SimpleRateLimiter {
private Semaphore semaphore;
private int maxPermits;
private TimeUnit timePeriod;
private ScheduledExecutorService scheduler;
public static SimpleRateLimiter create(int permits, TimeUnit timePeriod) {
SimpleRateLimiter limiter = new SimpleRateLimiter(permits, timePeriod);
limiter.schedulePermitReplenishment();
return limiter;
}
private SimpleRateLimiter(int permits, TimeUnit timePeriod) {
this.semaphore = new Semaphore(permits);
this.maxPermits = permits;
this.timePeriod = timePeriod;
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
public void stop() {
scheduler.shutdownNow();
}
public void schedulePermitReplenishment() {
scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(() -> {
semaphore.release(maxPermits - semaphore.availablePermits());
}, 1, timePeriod);
}
}
It takes a number of permits (allowed number of requests) and a time period. The time period is “1 X”, where X can be second/minute/hour/daily – depending on how you want your limit to be configured – per second, per minute, hourly, daily. Every 1 X a scheduler replenishes the acquired permits (in the example above there’s one scheduler per client, which may be inefficient with large number of clients – you can pass a shared scheduler pool instead). There is no control for bursts (a client can spend all permits with a rapid succession of requests), there is no warm-up functionality, there is no gradual replenishment. Depending on what you want, this may not be ideal, but that’s just a basic rate limiter that is thread-safe and doesn’t have any blocking. I wrote a unit test to confirm that the limiter behaves properly, and also ran performance tests against a local application to make sure the limit is obeyed. So far it seems to be working.
Are there alternatives? Well, yes – there are libraries like RateLimitJ that uses Redis to implement rate-limiting. That would mean, however, that you need to setup and run Redis. Which seems like an overhead for “simply” having rate-limiting. (Note: it seems to also have an in-memory version)
On the other hand, how would rate-limiting work properly in a cluster of application nodes? Application nodes probably need some database or gossip protocol to share data about the per-client permits (requests) remaining? Not necessarily. A very simple approach to this issue would be to assume that the load balancer distributes the load equally among your nodes. That way you would just have to set the limit on each node to be equal to the total limit divided by the number of nodes. It won’t be exact, but you rarely need it to be – allowing 5-10 more requests won’t kill your application, allowing 5-10 less won’t be dramatic for the users.
That, however, would mean that you have to know the number of application nodes. If you employ auto-scaling (e.g. in AWS), the number of nodes may change depending on the load. If that is the case, instead of configuring a hard-coded number of permits, the replenishing scheduled job can calculate the “maxPermits” on the fly, by calling an AWS (or other cloud-provider) API to obtain the number of nodes in the current auto-scaling group. That would still be simpler than supporting a redis deployment just for that.
Overall, I’m surprised there isn’t a “canonical” way to implement rate-limiting (in Java). Maybe the need for rate-limiting is not as common as it may seem. Or it’s implemented manually – by temporarily banning API clients that use “too much resources”.
Update: someone pointed out the bucket4j project, which seems nice and worth taking a look at.
By continuing to use the site, you agree to the use of cookies. more information
The cookie settings on this website are set to "allow cookies" to give you the best browsing experience possible. If you continue to use this website without changing your cookie settings or you click "Accept" below then you are consenting to this.