We have two new resources to help customers address their data protection requirements in Argentina. These resources specifically address the needs outlined under the Personal Data Protection Law No. 25.326, as supplemented by Regulatory Decree No. 1558/2001 (“PDPL”), including Disposition No. 11/2006. For context, the PDPL is an Argentine federal law that applies to the protection of personal data, including during transfer and processing.
A new webpage focused on data privacy in Argentina features FAQs, helpful links, and whitepapers that provide an overview of PDPL considerations, as well as our security assurance frameworks and international certifications, including ISO 27001, ISO 27017, and ISO 27018. You’ll also find details about our Information Request Report and the high bar of security at AWS data centers.
Additionally, we’ve released a new workbook that offers a detailed mapping as to how customers can operate securely under the Shared Responsibility Model while also aligning with Disposition No. 11/2006. The AWS Disposition 11/2006 Workbook can be downloaded from the Argentina Data Privacy page or directly from this link. Both resources are also available in Spanish from the Privacidad de los datos en Argentina page.
Want more AWS Security news? Follow us on Twitter.
Last year, we released Amazon Connect, a cloud-based contact center service that enables any business to deliver better customer service at low cost. This service is built based on the same technology that empowers Amazon customer service associates. Using this system, associates have millions of conversations with customers when they inquire about their shipping or order information. Because we made it available as an AWS service, you can now enable your contact center agents to make or receive calls in a matter of minutes. You can do this without having to provision any kind of hardware. 2
There are several advantages of building your contact center in the AWS Cloud, as described in our documentation. In addition, customers can extend Amazon Connect capabilities by using AWS products and the breadth of AWS services. In this blog post, we focus on how to get analytics out of the rich set of data published by Amazon Connect. We make use of an Amazon Connect data stream and create an end-to-end workflow to offer an analytical solution that can be customized based on need.
Solution overview
The following diagram illustrates the solution.
In this solution, Amazon Connect exports its contact trace records (CTRs) using Amazon Kinesis. CTRs are data streams in JSON format, and each has information about individual contacts. For example, this information might include the start and end time of a call, which agent handled the call, which queue the user chose, queue wait times, number of holds, and so on. You can enable this feature by reviewing our documentation.
In this architecture, we use Kinesis Firehose to capture Amazon Connect CTRs as raw data in an Amazon S3 bucket. We don’t use the recent feature added by Kinesis Firehose to save the data in S3 as Apache Parquet format. We use AWS Glue functionality to automatically detect the schema on the fly from an Amazon Connect data stream.
The primary reason for this approach is that it allows us to use attributes and enables an Amazon Connect administrator to dynamically add more fields as needed. Also by converting data to parquet in batch (every couple of hours) compression can be higher. However, if your requirement is to ingest the data in Parquet format on realtime, we recoment using Kinesis Firehose recently launched feature. You can review this blog post for further information.
By default, Firehose puts these records in time-series format. To make it easy for AWS Glue crawlers to capture information from new records, we use AWS Lambda to move all new records to a single S3 prefix called flatfiles. Our Lambda function is configured using S3 event notification. To comply with AWS Glue and Athena best practices, the Lambda function also converts all column names to lowercase. Finally, we also use the Lambda function to start AWS Glue crawlers. AWS Glue crawlers identify the data schema and update the AWS Glue Data Catalog, which is used by extract, transform, load (ETL) jobs in AWS Glue in the latter half of the workflow.
You can see our approach in the Lambda code following.
from __future__ import print_function
import json
import urllib
import boto3
import os
import re
s3 = boto3.resource('s3')
client = boto3.client('s3')
def convertColumntoLowwerCaps(obj):
for key in obj.keys():
new_key = re.sub(r'[\W]+', '', key.lower())
v = obj[key]
if isinstance(v, dict):
if len(v) > 0:
convertColumntoLowwerCaps(v)
if new_key != key:
obj[new_key] = obj[key]
del obj[key]
return obj
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
try:
client.download_file(bucket, key, '/tmp/file.json')
with open('/tmp/out.json', 'w') as output, open('/tmp/file.json', 'rb') as file:
i = 0
for line in file:
for object in line.replace("}{","}\n{").split("\n"):
record = json.loads(object,object_hook=convertColumntoLowwerCaps)
if i != 0:
output.write("\n")
output.write(json.dumps(record))
i += 1
newkey = 'flatfiles/' + key.replace("/", "")
client.upload_file('/tmp/out.json', bucket,newkey)
s3.Object(bucket,key).delete()
return "success"
except Exception as e:
print(e)
print('Error coping object {} from bucket {}'.format(key, bucket))
raise e
We trigger AWS Glue crawlers based on events because this approach lets us capture any new data frame that we want to be dynamic in nature. CTR attributes are designed to offer multiple custom options based on a particular call flow. Attributes are essentially key-value pairs in nested JSON format. With the help of event-based AWS Glue crawlers, you can easily identify newer attributes automatically.
We recommend setting up an S3 lifecycle policy on the flatfiles folder that keeps records only for 24 hours. Doing this optimizes AWS Glue ETL jobs to process a subset of files rather than the entire set of records.
After we have data in the flatfiles folder, we use AWS Glue to catalog the data and transform it into Parquet format inside a folder called parquet/ctr/. The AWS Glue job performs the ETL that transforms the data from JSON to Parquet format. We use AWS Glue crawlers to capture any new data frame inside the JSON code that we want to be dynamic in nature. What this means is that when you add new attributes to an Amazon Connect instance, the solution automatically recognizes them and incorporates them in the schema of the results.
After AWS Glue stores the results in Parquet format, you can perform analytics using Amazon Redshift Spectrum, Amazon Athena, or any third-party data warehouse platform. To keep this solution simple, we have used Amazon Athena for analytics. Amazon Athena allows us to query data without having to set up and manage any servers or data warehouse platforms. Additionally, we only pay for the queries that are executed.
Try it out!
You can get started with our sample AWS CloudFormation template. This template creates the components starting from the Kinesis stream and finishes up with S3 buckets, the AWS Glue job, and crawlers. To deploy the template, open the AWS Management Console by clicking the following link.
In the console, specify the following parameters:
BucketName: The name for the bucket to store all the solution files. This name must be unique; if it’s not, template creation fails.
etlJobSchedule: The schedule in cron format indicating how often the AWS Glue job runs. The default value is every hour.
KinesisStreamName: The name of the Kinesis stream to receive data from Amazon Connect. This name must be different from any other Kinesis stream created in your AWS account.
s3interval: The interval in seconds for Kinesis Firehose to save data inside the flatfiles folder on S3. The value must between 60 and 900 seconds.
sampledata: When this parameter is set to true, sample CTR records are used. Doing this lets you try this solution without setting up an Amazon Connect instance. All examples in this walkthrough use this sample data.
Select the “I acknowledge that AWS CloudFormation might create IAM resources.” check box, and then choose Create. After the template finishes creating resources, you can see the stream name on the stack Outputs tab.
If you haven’t created your Amazon Connect instance, you can do so by following the Getting Started Guide. When you are done creating, choose your Amazon Connect instance in the console, which takes you to instance settings. Choose Data streaming to enable streaming for CTR records. Here, you can choose the Kinesis stream (defined in the KinesisStreamName parameter) that was created by the CloudFormation template.
Now it’s time to generate the data by making or receiving calls by using Amazon Connect. You can go to Amazon Connect Cloud Control Panel (CCP) to make or receive calls using a software phone or desktop phone. After a few minutes, we should see data inside the flatfiles folder. To make it easier to try this solution, we provide sample data that you can enable by setting the sampledata parameter to true in your CloudFormation template.
You can navigate to the AWS Glue console by choosing Jobs on the left navigation pane of the console. We can select our job here. In my case, the job created by CloudFormation is called glueJob-i3TULzVtP1W0; yours should be similar. You run the job by choosing Run job for Action.
After that, we wait for the AWS Glue job to run and to finish successfully. We can track the status of the job by checking the History tab.
When the job finishes running, we can check the Database section. There should be a new table created called ctr in Parquet format.
To query the data with Athena, we can select the ctr table, and for Action choose View data.
Doing this takes us to the Athena console. If you run a query, Athena shows a preview of the data.
When we can query the data using Athena, we can visualize it using Amazon QuickSight. Before connecting Amazon QuickSight to Athena, we must make sure to grant Amazon QuickSight access to Athena and the associated S3 buckets in the account. For more information on doing this, see Managing Amazon QuickSight Permissions to AWS Resources in the Amazon QuickSight User Guide. We can then create a new data set in Amazon QuickSight based on the Athena table that was created.
After setting up permissions, we can create a new analysis in Amazon QuickSight by choosing New analysis.
Then we add a new data set.
We choose Athena as the source and give the data source a name (in this case, I named it connectctr).
Choose the name of the database and the table referencing the Parquet results.
Then choose Visualize.
After that, we should see the following screen.
Now we can create some visualizations. First, search for the agent.username column, and drag it to the AutoGraph section.
We can see the agents and the number of calls for each, so we can easily see which agents have taken the largest amount of calls. If we want to see from what queues the calls came for each agent, we can add the queue.arn column to the visual.
After following all these steps, you can use Amazon QuickSight to add different columns from the call records and perform different types of visualizations. You can build dashboards that continuously monitor your connect instance. You can share those dashboards with others in your organization who might need to see this data.
Conclusion
In this post, you see how you can use services like AWS Lambda, AWS Glue, and Amazon Athena to process Amazon Connect call records. The post also demonstrates how to use AWS Lambda to preprocess files in Amazon S3 and transform them into a format that recognized by AWS Glue crawlers. Finally, the post shows how to used Amazon QuickSight to perform visualizations.
You can use the provided template to analyze your own contact center instance. Or you can take the CloudFormation template and modify it to process other data streams that can be ingested using Amazon Kinesis or stored on Amazon S3.
Luis Caro is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.
Peter Dalbhanjan is a Solutions Architect for AWS based in Herndon, VA. Peter has a keen interest in evangelizing AWS solutions and has written multiple blog posts that focus on simplifying complex use cases. At AWS, Peter helps with designing and architecting variety of customer workloads.
Previously, I showed you how to rotate Amazon RDS database credentials automatically with AWS Secrets Manager. In addition to database credentials, AWS Secrets Manager makes it easier to rotate, manage, and retrieve API keys, OAuth tokens, and other secrets throughout their lifecycle. You can configure Secrets Manager to rotate these secrets automatically, which can help you meet your compliance needs. You can also use Secrets Manager to rotate secrets on demand, which can help you respond quickly to security events. In this post, I show you how to store an API key in Secrets Manager and use a custom Lambda function to rotate the key automatically. I’ll use a Twitter API key and bearer token as an example; you can reference this example to rotate other types of API keys.
The instructions are divided into four main phases:
Store a Twitter API key and bearer token in Secrets Manager.
Create a custom Lambda function to rotate the bearer token.
Configure your application to retrieve the bearer token from Secrets Manager.
Configure Secrets Manager to use the custom Lambda function to rotate the bearer token automatically.
For the purpose of this post, I use the placeholder Demo/Twitter_Api_Key to denote the API key, the placeholder Demo/Twitter_bearer_token to denote the bearer token, and placeholder Lambda_Rotate_Bearer_Token to denote the custom Lambda function. Be sure to replace these placeholders with the resource names from your account.
Phase 1: Store a Twitter API key and bearer token in Secrets Manager
Twitter enables developers to register their applications and retrieve an API key, which includes a consumer_key and consumer_secret. Developers use these to generate a bearer token that applications can then use to authenticate and retrieve information from Twitter. At any given point of time, you can use an API key to create only one valid bearer token.
Start by storing the API key in Secrets Manager. Here’s how:
Figure 1: The “Store a new secret” button in the AWS Secrets Manager console
Select Other type of secrets (because you’re storing an API key).
Input the consumer_key and consumer_secret, and then select Next.
Figure 2: Select the consumer_key and the consumer_secret
Specify values for Secret Name and Description, then select Next. For this example, I use Demo/Twitter_API_Key.
Figure 3: Set values for “Secret Name” and “Description”
On the next screen, keep the default setting, Disable automatic rotation, because you’ll use the same API key to rotate bearer tokens programmatically and automatically. Applications and employees will not retrieve this API key. Select Next.
Figure 4: Keep the default “Disable automatic rotation” setting
Review the information on the next screen and, if everything looks correct, select Store. You’ve now successfully stored a Twitter API key in Secrets Manager.
Next, store the bearer token in Secrets Manager. Here’s how:
From the Secrets Manager console, select Store a new secret, select Other type of secrets, input details (access_token, token_type, and ARN of the API key) about the bearer token, and then select Next.
Figure 5: Add details about the bearer token
Specify values for Secret Name and Description, and then select Next. For this example, I use Demo/Twitter_bearer_token.
Figure 6: Again set values for “Secret Name” and “Description”
Keep the default rotation setting, Disable automatic rotation, and then select Next. You’ll enable rotation after you’ve updated the application to use Secrets Manager APIs to retrieve secrets.
Review the information and select Store. You’ve now completed storing the bearer token in Secrets Manager. I take note of the sample code provided on the review page. I’ll use this code to update my application to retrieve the bearer token using Secrets Manager APIs.
Figure 7: The sample code you can use in your app
Phase 2: Create a custom Lambda function to rotate the bearer token
While Secrets Manager supports rotating credentials for databases hosted on Amazon RDS natively, it also enables you to meet your unique rotation-related use cases by authoring custom Lambda functions. Now that you’ve stored the API key and bearer token, you’ll create a Lambda function to rotate the bearer token. For this example, I’ll create my Lambda function using Python 3.6.
Figure 8: In the Lambda console, select “Create function”
Select Author from scratch. For this example, I use the name Lambda_Rotate_Bearer_Token for my Lambda function. I also set the Runtime environment as Python 3.6.
Figure 9: Create a new function from scratch
This Lambda function requires permissions to call AWS resources on your behalf. To grant these permissions, select Create a custom role. This opens a console tab.
Select Create a new IAM Role and specify the value for Role Name. For this example, I use Role_Lambda_Rotate_Twitter_Bearer_Token.
Figure 10: For “IAM Role,” select “Create a new IAM role”
Next, to define the IAM permissions, copy and paste the following IAM policy in the View Policy Document text-entry field. Be sure to replace the placeholder ARN-OF-Demo/Twitter_API_Key with the ARN of your secret.
Figure 11: The IAM policy pasted in the “View Policy Document” text-entry field
Now, select Allow. This brings me back to the Lambda console with the appropriate Role selected.
Select Create function.
Figure 12: Select the “Create function” button in the lower-right corner
Copy the following Python code and paste it in the Function code section.
import base64
import json
import logging
import os
import boto3
from botocore.vendored import requests
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""Secrets Manager Twitter Bearer Token Handler
This handler uses the master-user rotation scheme to rotate a bearer token of a Twitter app.
The Secret PlaintextString is expected to be a JSON string with the following format:
{
'access_token': ,
'token_type': ,
'masterarn':
}
Args:
event (dict): Lambda dictionary of event parameters. These keys must include the following:
- SecretId: The secret ARN or identifier
- ClientRequestToken: The ClientRequestToken of the secret version
- Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret)
context (LambdaContext): The Lambda runtime information
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not properly configured for rotation
KeyError: If the secret json does not contain the expected keys
"""
arn = event['SecretId']
token = event['ClientRequestToken']
step = event['Step']
# Setup the client and environment variables
service_client = boto3.client('secretsmanager', endpoint_url=os.environ['SECRETS_MANAGER_ENDPOINT'])
oauth2_token_url = os.environ['TWITTER_OAUTH2_TOKEN_URL']
oauth2_invalid_token_url = os.environ['TWITTER_OAUTH2_INVALID_TOKEN_URL']
tweet_search_url = os.environ['TWITTER_SEARCH_URL']
# Make sure the version is staged correctly
metadata = service_client.describe_secret(SecretId=arn)
if not metadata['RotationEnabled']:
logger.error("Secret %s is not enabled for rotation" % arn)
raise ValueError("Secret %s is not enabled for rotation" % arn)
versions = metadata['VersionIdsToStages']
if token not in versions:
logger.error("Secret version %s has no stage for rotation of secret %s." % (token, arn))
raise ValueError("Secret version %s has no stage for rotation of secret %s." % (token, arn))
if "AWSCURRENT" in versions[token]:
logger.info("Secret version %s already set as AWSCURRENT for secret %s." % (token, arn))
return
elif "AWSPENDING" not in versions[token]:
logger.error("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
# Call the appropriate step
if step == "createSecret":
create_secret(service_client, arn, token, oauth2_token_url, oauth2_invalid_token_url)
elif step == "setSecret":
set_secret(service_client, arn, token, oauth2_token_url)
elif step == "testSecret":
test_secret(service_client, arn, token, tweet_search_url)
elif step == "finishSecret":
finish_secret(service_client, arn, token)
else:
logger.error("lambda_handler: Invalid step parameter %s for secret %s" % (step, arn))
raise ValueError("Invalid step parameter %s for secret %s" % (step, arn))
def create_secret(service_client, arn, token, oauth2_token_url, oauth2_invalid_token_url):
"""Get a new bearer token from Twitter
This method invalidates existing bearer token for the Twitter app and retrieves a new one from Twitter.
If a secret version with AWSPENDING stage exists, updates it with the newly retrieved bearer token and if
the AWSPENDING stage does not exist, creates a new version of the secret with that stage label.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
oauth2_token_url (string): The Twitter API endpoint to request a bearer token
oauth2_invalid_token_url (string): The Twitter API endpoint to invalidate a bearer token
Raises:
ValueError: If the current secret is not valid JSON
KeyError: If the secret json does not contain the expected keys
ResourceNotFoundException: If the current secret is not found
"""
# Make sure the current secret exists and try to get the master arn from the secret
try:
current_secret_dict = get_secret_dict(service_client, arn, "AWSCURRENT")
master_arn = current_secret_dict['masterarn']
logger.info("createSecret: Successfully retrieved secret for %s." % arn)
except service_client.exceptions.ResourceNotFoundException:
return
# create bearer token credentials to be passed as authorization string to Twitter
bearer_token_credentials = encode_credentials(service_client, master_arn, "AWSCURRENT")
# get the bearer token from Twitter
bearer_token_from_twitter = get_bearer_token(bearer_token_credentials,oauth2_token_url)
# invalidate the current bearer token
invalidate_bearer_token(oauth2_invalid_token_url,bearer_token_credentials,bearer_token_from_twitter)
# get a new bearer token from Twitter
new_bearer_token = get_bearer_token(bearer_token_credentials, oauth2_token_url)
# if a secret version with AWSPENDING stage exists, update it with the lastest bearer token
# if the AWSPENDING stage does not exist, then create the version with AWSPENDING stage
try:
pending_secret_dict = get_secret_dict(service_client, arn, "AWSPENDING", token)
pending_secret_dict['access_token'] = new_bearer_token
service_client.put_secret_value(SecretId=arn, ClientRequestToken=token, SecretString=json.dumps(pending_secret_dict), VersionStages=['AWSPENDING'])
logger.info("createSecret: Successfully invalidated the bearer token of the secret %s and updated the pending version" % arn)
except service_client.exceptions.ResourceNotFoundException:
current_secret_dict['access_token'] = new_bearer_token
service_client.put_secret_value(SecretId=arn, ClientRequestToken=token, SecretString=json.dumps(current_secret_dict), VersionStages=['AWSPENDING'])
logger.info("createSecret: Successfully invalidated the bearer token of the secret %s and and created the pending version." % arn)
def set_secret(service_client, arn, token, oauth2_token_url):
"""Validate the pending secret with that in Twitter
This method checks wether the bearer token in Twitter is the same as the one in the version with AWSPENDING stage.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
oauth2_token_url (string): The Twitter API endopoint to get a bearer token
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not valid JSON or master credentials could not be used to login to DB
KeyError: If the secret json does not contain the expected keys
"""
# First get the pending version of the bearer token and compare it with that in Twitter
pending_secret_dict = get_secret_dict(service_client, arn, "AWSPENDING")
master_arn = pending_secret_dict['masterarn']
# create bearer token credentials to be passed as authorization string to Twitter
bearer_token_credentials = encode_credentials(service_client, master_arn, "AWSCURRENT")
# get the bearer token from Twitter
bearer_token_from_twitter = get_bearer_token(bearer_token_credentials, oauth2_token_url)
# if the bearer tokens are same, invalidate the bearer token in Twitter
# if not, raise an exception that bearer token in Twitter was changed outside Secrets Manager
if pending_secret_dict['access_token'] == bearer_token_from_twitter:
logger.info("createSecret: Successfully verified the bearer token of arn %s" % arn)
else:
raise ValueError("The bearer token of the Twitter app was changed outside Secrets Manager. Please check.")
def test_secret(service_client, arn, token, tweet_search_url):
"""Test the pending secret by calling a Twitter API
This method tries to use the bearer token in the secret version with AWSPENDING stage and search for tweets
with 'aws secrets manager' string.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not valid JSON or pending credentials could not be used to login to the database
KeyError: If the secret json does not contain the expected keys
"""
# First get the pending version of the bearer token and compare it with that in Twitter
pending_secret_dict = get_secret_dict(service_client, arn, "AWSPENDING", token)
# Now verify you can search for tweets using the bearer token
if verify_bearer_token(pending_secret_dict['access_token'], tweet_search_url):
logger.info("testSecret: Successfully authorized with the pending secret in %s." % arn)
return
else:
logger.error("testSecret: Unable to authorize with the pending secret of secret ARN %s" % arn)
raise ValueError("Unable to connect to Twitter with pending secret of secret ARN %s" % arn)
def finish_secret(service_client, arn, token):
"""Finish the rotation by marking the pending secret as current
This method moves the secret from the AWSPENDING stage to the AWSCURRENT stage.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
"""
# First describe the secret to get the current version
metadata = service_client.describe_secret(SecretId=arn)
current_version = None
for version in metadata["VersionIdsToStages"]:
if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
if version == token:
# The correct version is already marked as current, return
logger.info("finishSecret: Version %s already marked as AWSCURRENT for %s" % (version, arn))
return
current_version = version
break
# Finalize by staging the secret version current
service_client.update_secret_version_stage(SecretId=arn, VersionStage="AWSCURRENT", MoveToVersionId=token, RemoveFromVersionId=current_version)
logger.info("finishSecret: Successfully set AWSCURRENT stage to version %s for secret %s." % (version, arn))
def encode_credentials(service_client, arn, stage):
"""Encodes the Twitter credentials
This helper function encodes the Twitter credentials (consumer_key and consumer_secret)
Args:
service_client (client):The secrets manager service client
arn (string): The secret ARN or other identifier
stage (stage): The stage identifying the secret version
Returns:
encoded_credentials (string): base64 encoded authorization string for Twitter
Raises:
KeyError: If the secret json does not contain the expected keys
"""
required_fields = ['consumer_key','consumer_secret']
master_secret_dict = get_secret_dict(service_client, arn, stage)
for field in required_fields:
if field not in master_secret_dict:
raise KeyError("%s key is missing from the secret JSON" % field)
encoded_credentials = base64.urlsafe_b64encode(
'{}:{}'.format(master_secret_dict['consumer_key'], master_secret_dict['consumer_secret']).encode('ascii')).decode('ascii')
return encoded_credentials
def get_bearer_token(encoded_credentials, oauth2_token_url):
"""Gets a bearer token from Twitter
This helper function retrieves the current bearer token from Twitter, given a set of credentials.
Args:
encoded_credentials (string): Twitter credentials for authentication
oauth2_token_url (string): REST API endpoint to request a bearer token from Twitter
Raises:
KeyError: If the secret json does not contain the expected keys
"""
headers = {
'Authorization': 'Basic {}'.format(encoded_credentials),
'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8',
}
data = 'grant_type=client_credentials'
response = requests.post(oauth2_token_url, headers=headers, data=data)
response_data = response.json()
if response_data['token_type'] == 'bearer':
bearer_token = response_data['access_token']
return bearer_token
else:
raise RuntimeError('unexpected token type: {}'.format(response_data['token_type']))
def invalidate_bearer_token(oauth2_invalid_token_url, bearer_token_credentials, bearer_token):
"""Invalidates a Bearer Token of a Twitter App
This helper function invalidates a bearer token of a Twitter app.
If successful, it returns the invalidated bearer token, else None
Args:
oauth2_invalid_token_url (string): The Twitter API endpoint to invalidate a bearer token
bearer_token_credentials (string): encoded consumer key and consumer secret to authenticate with Twitter
bearer_token (string): The bearer token to be invalidated
Returns:
invalidated_bearer_token: The invalidated bearer token
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not valid JSON
KeyError: If the secret json does not contain the expected keys
"""
headers = {
'Authorization': 'Basic {}'.format(bearer_token_credentials),
'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8',
}
data = 'access_token=' + bearer_token
invalidate_response = requests.post(oauth2_invalid_token_url, headers=headers, data=data)
invalidate_response_data = invalidate_response.json()
if invalidate_response_data:
return
else:
raise RuntimeError('Invalidate bearer token request failed')
def verify_bearer_token(bearer_token, tweet_search_url):
"""Verifies access to Twitter APIs using a bearer token
This helper function verifies that the bearer token is valid by calling Twitter's search/tweets API endpoint
Args:
bearer_token (string): The current bearer token for the application
Returns:
True or False
Raises:
KeyError: If the response of search tweets API call fails
"""
headers = {
'Authorization' : 'Bearer {}'.format(bearer_token),
'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8',
}
search_results = requests.get(tweet_search_url, headers=headers)
try:
search_results.json()['statuses']
return True
except:
return False
def get_secret_dict(service_client, arn, stage, token=None):
"""Gets the secret dictionary corresponding for the secret arn, stage, and token
This helper function gets credentials for the arn and stage passed in and returns the dictionary by parsing the JSON string
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version, or None if no validation is desired
stage (string): The stage identifying the secret version
Returns:
SecretDictionary: Secret dictionary
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not valid JSON
"""
# Only do VersionId validation against the stage if a token is passed in
if token:
secret = service_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage=stage)
else:
secret = service_client.get_secret_value(SecretId=arn, VersionStage=stage)
plaintext = secret['SecretString']
# Parse and return the secret JSON string
return json.loads(plaintext)
Here’s what it will look like:
Figure 13: The Python code pasted in the “Function code” section
On the same page, provide the following environment variables:
Note: Resources used in this example are in US East (Ohio) region. If you intend to use another AWS Region, change the SECRETS_MANAGER_ENDPOINT set in the Environment variables to the appropriate region.
You’ve now created a Lambda function that can rotate the bearer token:
Figure 15: The new Lambda function
Before you can configure Secrets Manager to use this Lambda function, you need to update the function policy of the Lambda function. A function policy permits AWS services, such as Secrets Manager, to invoke a Lambda function on behalf of your application. You can attach a Lambda function policy from the AWS Command Line Interface (AWS CLI) or SDK. To attach a function policy, call the add-permission Lambda API from the AWS CLI.
Phase 3: Configure your application to retrieve the bearer token from Secrets Manager
Now that you’ve stored the bearer token in Secrets Manager, update the application to retrieve the bearer token from Secrets Manager instead of hard-coding this information in a configuration file or source code. For this example, I show you how to configure a Python application to retrieve this secret from Secrets Manager.
import config
def no_secrets_manager_sample()
# Get the bearer token from a config file.
Bearer_token = config.bearer_token
# Use the bearer token to authenticate requests to Twitter
Use the sample code from section titled Phase 1 and update the application to retrieve the bearer token from Secrets Manager. The following code sets up the client and retrieves and decrypts the secret Demo/Twitter_bearer_token.
# Use this code snippet in your app.
import boto3
from botocore.exceptions import ClientError
def get_secret():
secret_name = "Demo/Twitter_bearer_token"
endpoint_url = "https://secretsmanager.us-east-2.amazonaws.com"
region_name = "us-east-2"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name,
endpoint_url=endpoint_url
)
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
print("The requested secret " + secret_name + " was not found")
elif e.response['Error']['Code'] == 'InvalidRequestException':
print("The request was invalid due to:", e)
elif e.response['Error']['Code'] == 'InvalidParameterException':
print("The request had invalid params:", e)
else:
# Decrypted secret using the associated KMS CMK
# Depending on whether the secret was a string or binary, one of these fields will be populated
if 'SecretString' in get_secret_value_response:
secret = get_secret_value_response['SecretString']
else:
binary_secret_data = get_secret_value_response['SecretBinary']
# Your code goes here.
Applications require permissions to access Secrets Manager. My application runs on Amazon EC2 and uses an IAM role to get access to AWS services. I’ll attach the following policy to my IAM role, and you should take a similar action with your IAM role. This policy uses the GetSecretValue action to grant my application permissions to read secrets from Secrets Manager. This policy also uses the resource element to limit my application to read only the Demo/Twitter_bearer_token secret from Secrets Manager. Read the AWS Secrets Manager documentation to understand the minimum IAM permissions required to retrieve a secret.
{
"Version": "2012-10-17",
"Statement": {
"Sid": "RetrieveBearerToken",
"Effect": "Allow",
"Action": "secretsmanager:GetSecretValue",
"Resource": Input ARN of the secret Demo/Twitter_bearer_token here
}
}
Note: To improve the resiliency of your applications, associate your application with two API keys/bearer tokens. This is a higher availability option because you can continue to use one bearer token while Secrets Manager rotates the other token. Read the AWS documentation to learn how AWS Secrets Manager rotates your secrets.
Phase 4: Enable and verify rotation
Now that you’ve stored the secret in Secrets Manager and created a Lambda function to rotate this secret, configure Secrets Manager to rotate the secret Demo/Twitter_bearer_token.
From the Secrets Manager console, go to the list of secrets and choose the secret you created in the first step (in my example, this is named Demo/Twitter_bearer_token).
Scroll to Rotation configuration, and then select Edit rotation.
Figure 16: Select the “Edit rotation” button
To enable rotation, select Enable automatic rotation, and then choose how frequently you want Secrets Manager to rotate this secret. For this example, I set the rotation interval to 30 days. I also choose the rotation Lambda function, Lambda_Rotate_Bearer_Token, from the drop-down list.
Figure 17: “Edit rotation configuration” options
The banner on the next screen confirms that I have successfully configured rotation and the first rotation is in progress, which enables you to verify that rotation is functioning as expected. Secrets Manager will rotate this credential automatically every 30 days.
Figure 18: Confirmation notice
Summary
In this post, I showed you how to configure Secrets Manager to manage and rotate an API key and bearer token used by applications to authenticate and retrieve information from Twitter. You can use the steps described in this blog to manage and rotate other API keys, as well.
Secrets Manager helps you protect access to your applications, services, and IT resources without the upfront investment and on-going maintenance costs of operating your own secrets management infrastructure. To get started, open the Secrets Manager console. To learn more, read the Secrets Manager documentation.
If you have comments about this post, submit them in the Comments section below. If you have questions about anything in this post, start a new thread on the Secrets Manager forum or contact AWS Support.
Want more AWS Security news? Follow us on Twitter.
The adoption of Apache Spark has increased significantly over the past few years, and running Spark-based application pipelines is the new normal. Spark jobs that are in an ETL (extract, transform, and load) pipeline have different requirements—you must handle dependencies in the jobs, maintain order during executions, and run multiple jobs in parallel. In most of these cases, you can use workflow scheduler tools like Apache Oozie, Apache Airflow, and even Cron to fulfill these requirements.
Apache Oozie is a widely used workflow scheduler system for Hadoop-based jobs. However, its limited UI capabilities, lack of integration with other services, and heavy XML dependency might not be suitable for some users. On the other hand, Apache Airflow comes with a lot of neat features, along with powerful UI and monitoring capabilities and integration with several AWS and third-party services. However, with Airflow, you do need to provision and manage the Airflow server. The Cron utility is a powerful job scheduler. But it doesn’t give you much visibility into the job details, and creating a workflow using Cron jobs can be challenging.
What if you have a simple use case, in which you want to run a few Spark jobs in a specific order, but you don’t want to spend time orchestrating those jobs or maintaining a separate application? You can do that today in a serverless fashion using AWS Step Functions. You can create the entire workflow in AWS Step Functions and interact with Spark on Amazon EMR through Apache Livy.
In this post, I walk you through a list of steps to orchestrate a serverless Spark-based ETL pipeline using AWS Step Functions and Apache Livy.
Input data
For the source data for this post, I use the New York City Taxi and Limousine Commission (TLC) trip record data. For a description of the data, see this detailed dictionary of the taxi data. In this example, we’ll work mainly with the following three columns for the Spark jobs.
Column name
Column description
RateCodeID
Represents the rate code in effect at the end of the trip (for example, 1 for standard rate, 2 for JFK airport, 3 for Newark airport, and so on).
FareAmount
Represents the time-and-distance fare calculated by the meter.
TripDistance
Represents the elapsed trip distance in miles reported by the taxi meter.
The trip data is in comma-separated values (CSV) format with the first row as a header. To shorten the Spark execution time, I trimmed the large input data to only 20,000 rows. During the deployment phase, the input file tripdata.csv is stored in Amazon S3 in the <<your-bucket>>/emr-step-functions/input/ folder.
The following image shows a sample of the trip data:
Solution overview
The next few sections describe how Spark jobs are created for this solution, how you can interact with Spark using Apache Livy, and how you can use AWS Step Functions to create orchestrations for these Spark applications.
At a high level, the solution includes the following steps:
Trigger the AWS Step Function state machine by passing the input file path.
The first stage in the state machine triggers an AWS Lambda
The Lambda function interacts with Apache Spark running on Amazon EMR using Apache Livy, and submits a Spark job.
The state machine waits a few seconds before checking the Spark job status.
Based on the job status, the state machine moves to the success or failure state.
Subsequent Spark jobs are submitted using the same approach.
The state machine waits a few seconds for the job to finish.
The job finishes, and the state machine updates with its final status.
Let’s take a look at the Spark application that is used for this solution.
Spark jobs
For this example, I built a Spark jar named spark-taxi.jar. It has two different Spark applications:
MilesPerRateCode – The first job that runs on the Amazon EMR cluster. This job reads the trip data from an input source and computes the total trip distance for each rate code. The output of this job consists of two columns and is stored in Apache Parquet format in the output path.
The following are the expected output columns:
rate_code – Represents the rate code for the trip.
total_distance – Represents the total trip distance for that rate code (for example, sum(trip_distance)).
RateCodeStatus – The second job that runs on the EMR cluster, but only if the first job finishes successfully. This job depends on two different input sets:
csv – The same trip data that is used for the first Spark job.
miles-per-rate – The output of the first job.
This job first reads the tripdata.csv file and aggregates the fare_amount by the rate_code. After this point, you have two different datasets, both aggregated by rate_code. Finally, the job uses the rate_code field to join two datasets and output the entire rate code status in a single CSV file.
The output columns are as follows:
rate_code_id – Represents the rate code type.
total_distance – Derived from first Spark job and represents the total trip distance.
total_fare_amount – A new field that is generated during the second Spark application, representing the total fare amount by the rate code type.
Note that in this case, you don’t need to run two different Spark jobs to generate that output. The goal of setting up the jobs in this way is just to create a dependency between the two jobs and use them within AWS Step Functions.
Both Spark applications take one input argument called rootPath. It’s the S3 location where the Spark job is stored along with input and output data. Here is a sample of the final output:
The next section discusses how you can use Apache Livy to interact with Spark applications that are running on Amazon EMR.
Using Apache Livy to interact with Apache Spark
Apache Livy provides a REST interface to interact with Spark running on an EMR cluster. Livy is included in Amazon EMR release version 5.9.0 and later. In this post, I use Livy to submit Spark jobs and retrieve job status. When Amazon EMR is launched with Livy installed, the EMR master node becomes the endpoint for Livy, and it starts listening on port 8998 by default. Livy provides APIs to interact with Spark.
Let’s look at a couple of examples how you can interact with Spark running on Amazon EMR using Livy.
To list active running jobs, you can execute the following from the EMR master node:
curl localhost:8998/sessions
If you want to do the same from a remote instance, just change localhost to the EMR hostname, as in the following (port 8998 must be open to that remote instance through the security group):
Through Spark submit, you can pass multiple arguments for the Spark job and Spark configuration settings. You can also do that using Livy, by passing the S3 path through the args parameter, as shown following:
For a detailed list of Livy APIs, see the Apache Livy REST API page. This post uses GET /batches and POST /batches.
In the next section, you create a state machine and orchestrate Spark applications using AWS Step Functions.
Using AWS Step Functions to create a Spark job workflow
AWS Step Functions automatically triggers and tracks each step and retries when it encounters errors. So your application executes in order and as expected every time. To create a Spark job workflow using AWS Step Functions, you first create a Lambda state machine using different types of states to create the entire workflow.
First, you use the Task state—a simple state in AWS Step Functions that performs a single unit of work. You also use the Wait state to delay the state machine from continuing for a specified time. Later, you use the Choice state to add branching logic to a state machine.
The following is a quick summary of how to use different states in the state machine to create the Spark ETL pipeline:
Task state – Invokes a Lambda function. The first Task state submits the Spark job on Amazon EMR, and the next Task state is used to retrieve the previous Spark job status.
Wait state – Pauses the state machine until a job completes execution.
Choice state – Each Spark job execution can return a failure, an error, or a success state So, in the state machine, you use the Choice state to create a rule that specifies the next action or step based on the success or failure of the previous step.
Here is one of my Task states, MilesPerRateCode, which simply submits a Spark job:
"MilesPerRate Job": {
"Type": "Task",
"Resource":"arn:aws:lambda:us-east-1:xxxxxx:function:blog-miles-per-rate-job-submit-function",
"ResultPath": "$.jobId",
"Next": "Wait for MilesPerRate job to complete"
}
This Task state configuration specifies the Lambda function to execute. Inside the Lambda function, it submits a Spark job through Livy using Livy’s POST API. Using ResultPath, it tells the state machine where to place the result of the executing task. As discussed in the previous section, Spark submit returns the session ID, which is captured with $.jobId and used in a later state.
The following code section shows the Lambda function, which is used to submit the MilesPerRateCode job. It uses the Python request library to submit a POST against the Livy endpoint hosted on Amazon EMR and passes the required parameters in JSON format through payload. It then parses the response, grabs id from the response, and returns it. The Next field tells the state machine which state to go to next.
Just like in the MilesPerRate job, another state submits the RateCodeStatus job, but it executes only when all previous jobs have completed successfully.
Here is the Task state in the state machine that checks the Spark job status:
Just like other states, the preceding Task executes a Lambda function, captures the result (represented by jobStatus), and passes it to the next state. The following is the Lambda function that checks the Spark job status based on a given session ID:
In the Choice state, it checks the Spark job status value, compares it with a predefined state status, and transitions the state based on the result. For example, if the status is success, move to the next state (RateCodeJobStatus job), and if it is dead, move to the MilesPerRate job failed state.
To set up this entire solution, you need to create a few AWS resources. To make it easier, I have created an AWS CloudFormation template. This template creates all the required AWS resources and configures all the resources that are needed to create a Spark-based ETL pipeline on AWS Step Functions.
This CloudFormation template requires you to pass the following four parameters during initiation.
Parameter
Description
ClusterSubnetID
The subnet where the Amazon EMR cluster is deployed and Lambda is configured to talk to this subnet.
KeyName
The name of the existing EC2 key pair to access the Amazon EMR cluster.
VPCID
The ID of the virtual private cloud (VPC) where the EMR cluster is deployed and Lambda is configured to talk to this VPC.
S3RootPath
The Amazon S3 path where all required files (input file, Spark job, and so on) are stored and the resulting data is written.
IMPORTANT: These templates are designed only to show how you can create a Spark-based ETL pipeline on AWS Step Functions using Apache Livy. They are not intended for production use without modification. And if you try this solution outside of the us-east-1 Region, download the necessary files from s3://aws-data-analytics-blog/emr-step-functions, upload the files to the buckets in your Region, edit the script as appropriate, and then run it.
To launch the CloudFormation stack, choose Launch Stack:
Launching this stack creates the following list of AWS resources.
Logical ID
Resource Type
Description
StepFunctionsStateExecutionRole
IAM role
IAM role to execute the state machine and have a trust relationship with the states service.
SparkETLStateMachine
AWS Step Functions state machine
State machine in AWS Step Functions for the Spark ETL workflow.
LambdaSecurityGroup
Amazon EC2 security group
Security group that is used for the Lambda function to call the Livy API.
RateCodeStatusJobSubmitFunction
AWS Lambda function
Lambda function to submit the RateCodeStatus job.
MilesPerRateJobSubmitFunction
AWS Lambda function
Lambda function to submit the MilesPerRate job.
SparkJobStatusFunction
AWS Lambda function
Lambda function to check the Spark job status.
LambdaStateMachineRole
IAM role
IAM role for all Lambda functions to use the lambda trust relationship.
EMRCluster
Amazon EMR cluster
EMR cluster where Livy is running and where the job is placed.
During the AWS CloudFormation deployment phase, it sets up S3 paths for input and output. Input files are stored in the <<s3-root-path>>/emr-step-functions/input/ path, whereas spark-taxi.jar is copied under <<s3-root-path>>/emr-step-functions/.
The following screenshot shows how the S3 paths are configured after deployment. In this example, I passed a bucket that I created in the AWS account s3://tm-app-demos for the S3 root path.
If the CloudFormation template completed successfully, you will see Spark-ETL-State-Machine in the AWS Step Functions dashboard, as follows:
Choose the Spark-ETL-State-Machine state machine to take a look at this implementation. The AWS CloudFormation template built the entire state machine along with its dependent Lambda functions, which are now ready to be executed.
On the dashboard, choose the newly created state machine, and then choose New execution to initiate the state machine. It asks you to pass input in JSON format. This input goes to the first state MilesPerRate Job, which eventually executes the Lambda function blog-miles-per-rate-job-submit-function.
Pass the S3 root path as input:
{
“rootPath”: “s3://tm-app-demos”
}
Then choose Start Execution:
The rootPath value is the same value that was passed when creating the CloudFormation stack. It can be an S3 bucket location or a bucket with prefixes, but it should be the same value that is used for AWS CloudFormation. This value tells the state machine where it can find the Spark jar and input file, and where it will write output files. After the state machine starts, each state/task is executed based on its definition in the state machine.
At a high level, the following represents the flow of events:
Execute the first Spark job, MilesPerRate.
The Spark job reads the input file from the location <<rootPath>>/emr-step-functions/input/tripdata.csv. If the job finishes successfully, it writes the output data to <<rootPath>>/emr-step-functions/miles-per-rate.
If the Spark job fails, it transitions to the error state MilesPerRate job failed, and the state machine stops. If the Spark job finishes successfully, it transitions to the RateCodeStatus Job state, and the second Spark job is executed.
If the second Spark job fails, it transitions to the error state RateCodeStatus job failed, and the state machine stops with the Failed status.
If this Spark job completes successfully, it writes the final output data to the <<rootPath>>/emr-step-functions/rate-code-status/ It also transitions the RateCodeStatus job finished state, and the state machine ends its execution with the Success status.
This following screenshot shows a successfully completed Spark ETL state machine:
The right side of the state machine diagram shows the details of individual states with their input and output.
When you execute the state machine for the second time, it fails because the S3 path already exists. The state machine turns red and stops at MilePerRate job failed. The following image represents that failed execution of the state machine:
You can also check your Spark application status and logs by going to the Amazon EMR console and viewing the Application history tab:
I hope this walkthrough paints a picture of how you can create a serverless solution for orchestrating Spark jobs on Amazon EMR using AWS Step Functions and Apache Livy. In the next section, I share some ideas for making this solution even more elegant.
Next steps
The goal of this post is to show a simple example that uses AWS Step Functions to create an orchestration for Spark-based jobs in a serverless fashion. To make this solution robust and production ready, you can explore the following options:
In this example, I manually initiated the state machine by passing the rootPath as input. You can instead trigger the state machine automatically. To run the ETL pipeline as soon as the files arrive in your S3 bucket, you can pass the new file path to the state machine. Because CloudWatch Events supports AWS Step Functions as a target, you can create a CloudWatch rule for an S3 event. You can then set AWS Step Functions as a target and pass the new file path to your state machine. You’re all set!
You can also improve this solution by adding an alerting mechanism in case of failures. To do this, create a Lambda function that sends an alert email and assigns that Lambda function to a Fail That way, when any part of your state fails, it triggers an email and notifies the user.
If you want to submit multiple Spark jobs in parallel, you can use the Parallel state type in AWS Step Functions. The Parallel state is used to create parallel branches of execution in your state machine.
With Lambda and AWS Step Functions, you can create a very robust serverless orchestration for your big data workload.
Cleaning up
When you’ve finished testing this solution, remember to clean up all those AWS resources that you created using AWS CloudFormation. Use the AWS CloudFormation console or AWS CLI to delete the stack named Blog-Spark-ETL-Step-Functions.
Summary
In this post, I showed you how to use AWS Step Functions to orchestrate your Spark jobs that are running on Amazon EMR. You used Apache Livy to submit jobs to Spark from a Lambda function and created a workflow for your Spark jobs, maintaining a specific order for job execution and triggering different AWS events based on your job’s outcome. Go ahead—give this solution a try, and share your experience with us!
Tanzir Musabbir is an EMR Specialist Solutions Architect with AWS. He is an early adopter of open source Big Data technologies. At AWS, he works with our customers to provide them architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena & AWS Glue. Tanzir is a big Real Madrid fan and he loves to travel in his free time.
When I talk with customers and partners, I find that they are in different stages in the adoption of DevOps methodologies. They are automating the creation of application artifacts and the deployment of their applications to different infrastructure environments. In many cases, they are creating and supporting multiple applications using a variety of coding languages and artifacts.
The management of these processes and artifacts can be challenging, but using the right tools and methodologies can simplify the process.
In this post, I will show you how you can automate the creation and storage of application artifacts through the implementation of a pipeline and custom deploy action in AWS CodePipeline. The example includes a Node.js code base stored in an AWS CodeCommit repository. A Node Package Manager (npm) artifact is built from the code base, and the build artifact is published to a JFrogArtifactory npm repository.
I frequently recommend AWS CodePipeline, the AWS continuous integration and continuous delivery tool. You can use it to quickly innovate through integration and deployment of new features and bug fixes by building a workflow that automates the build, test, and deployment of new versions of your application. And, because AWS CodePipeline is extensible, it allows you to create a custom action that performs customized, automated actions on your behalf.
JFrog’s Artifactory is a universal binary repository manager where you can manage multiple applications, their dependencies, and versions in one place. Artifactory also enables you to standardize the way you manage your package types across all applications developed in your company, no matter the code base or artifact type.
If you already have a Node.js CodeCommit repository, a JFrog Artifactory host, and would like to automate the creation of the pipeline, including the custom action and CodeBuild project, you can use this AWS CloudFormationtemplate to create your AWS CloudFormation stack.
This figure shows the path defined in the pipeline for this project. It starts with a change to Node.js source code committed to a private code repository in AWS CodeCommit. With this change, CodePipeline triggers AWS CodeBuild to create the npm package from the node.js source code. After the build, CodePipeline triggers the custom action job worker to commit the build artifact to the designated artifact repository in Artifactory.
This blog post assumes you have already:
· Created a CodeCommit repository that contains a Node.js project.
· Configured a two-stage pipeline in AWS CodePipeline.
The Source stage of the pipeline is configured to poll the Node.js CodeCommit repository. The Build stage is configured to use a CodeBuild project to build the npm package using a buildspec.yml file located in the code repository.
If you do not have a Node.js repository, you can create a CodeCommit repository that contains this simple ‘Hello World’ project. This project also includes a buildspec.yml file that is used when you define your CodeBuild project. It defines the steps to be taken by CodeBuild to create the npm artifact.
If you do not already have a pipeline set up in CodePipeline, you can use this template to create a pipeline with a CodeCommit source action and a CodeBuild build action through the AWS Command Line Interface (AWS CLI). If you do not want to install the AWS CLI on your local machine, you can use AWS Cloud9, our managed integrated development environment (IDE), to interact with AWS APIs.
In your development environment, open your favorite editor and fill out the template with values appropriate to your project. For information, see the readme in the GitHub repository.
Use this CLI command to create the pipeline from the template:
It creates a pipeline that has a CodeCommit source action and a CodeBuild build action.
Integrating JFrog Artifactory
JFrog Artifactory provides default repositories for your project needs. For my NPM package repository, I am using the default virtual npm repository (named npm) that is available in Artifactory Pro. You might want to consider creating a repository per project but for the example used in this post, using the default lets me get started without having to configure a new repository.
I can use the steps in the Set Me Up -> npm section on the landing page to configure my worker to interact with the default NPM repository.
Describes the required values to run the custom action. I will define my custom action in the ‘Deploy’ category, identify the provider as ‘Artifactory’, of version ‘1’, and specify a variety of configurationProperties whose values will be defined when this stage is added to my pipeline.
Polls CodePipeline for a job, scanning for its action-definition properties. In this blog post, after a job has been found, the job worker does the work required to publish the npm artifact to the Artifactory repository.
{
"category": "Deploy",
"configurationProperties": [{
"name": "TypeOfArtifact",
"required": true,
"key": true,
"secret": false,
"description": "Package type, ex. npm for node packages",
"type": "String"
},
{ "name": "RepoKey",
"required": true,
"key": true,
"secret": false,
"type": "String",
"description": "Name of the repository in which this artifact should be stored"
},
{ "name": "UserName",
"required": true,
"key": true,
"secret": false,
"type": "String",
"description": "Username for authenticating with the repository"
},
{ "name": "Password",
"required": true,
"key": true,
"secret": true,
"type": "String",
"description": "Password for authenticating with the repository"
},
{ "name": "EmailAddress",
"required": true,
"key": true,
"secret": false,
"type": "String",
"description": "Email address used to authenticate with the repository"
},
{ "name": "ArtifactoryHost",
"required": true,
"key": true,
"secret": false,
"type": "String",
"description": "Public address of Artifactory host, ex: https://myexamplehost.com or http://myexamplehost.com:8080"
}],
"provider": "Artifactory",
"version": "1",
"settings": {
"entityUrlTemplate": "{Config:ArtifactoryHost}/artifactory/webapp/#/artifacts/browse/tree/General/{Config:RepoKey}"
},
"inputArtifactDetails": {
"maximumCount": 5,
"minimumCount": 1
},
"outputArtifactDetails": {
"maximumCount": 5,
"minimumCount": 0
}
}
There are seven sections to the custom action definition:
category: This is the stage in which you will be creating this action. It can be Source, Build, Deploy, Test, Invoke, Approval. Except for source actions, the category section simply allows us to organize our actions. I am setting the category for my action as ‘Deploy’ because I’m using it to publish my node artifact to my Artifactory instance.
configurationProperties: These are the parameters or variables required for your project to authenticate and commit your artifact. In the case of my custom worker, I need:
TypeOfArtifact: In this case, npm, because it’s for the Node Package Manager.
RepoKey: The name of the repository. In this case, it’s the default npm.
UserName and Password for the user to authenticate with the Artifactory repository.
EmailAddress used to authenticate with the repository.
Artifactory host name or IP address.
provider: The name you define for your custom action stage. I have named the provider Artifactory.
version: Version number for the custom action. Because this is the first version, I set the version number to 1.
entityUrlTemplate: This URL is presented to your users for the deploy stage along with the title you define in your provider. The link takes the user to their artifact repository page in the Artifactory host.
inputArtifactDetails: The number of artifacts to expect from the previous stage in the pipeline.
outputArtifactDetails: The number of artifacts that should be the result from the custom action stage. Later in this blog post, I define 0 for my output artifacts because I am publishing the artifact to the Artifactory repository as the final action.
After I define the custom action in a JSON file, I use the AWS CLI to create the custom action type in CodePipeline:
After I create the custom action type in the same region as my pipeline, I edit the pipeline to add a Deploy stage and configure it to use the custom action I created for Artifactory:
I have created a custom worker for the actions required to commit the npm artifact to the Artifactory repository. The worker is in Python and it runs in a loop on an Amazon EC2 instance. My custom worker polls for a deploy job and publishes the NPM artifact to the Artifactory repository.
The EC2 instance is running Amazon Linux and has an IAM instance role attached that gives the worker permission to access CodePipeline. The worker process is as follows:
Take the configuration properties from the custom worker and poll CodePipeline for a custom action job.
After there is a job in the job queue with the appropriate category, provider, and version, acknowledge the job.
Download the zipped artifact created in the previous Build stage from the provided S3 buckets with the provided temporary credentials.
Unzip the artifact into a temporary directory.
A user-defined Artifactory user name and password is used to receive a temporary API key from Artifactory.
To avoid having to write the password to a file, use that temporary API key and user name to authenticate with the NPM repository.
Publish the Node.js package to the specified repository.
Because I am running my custom worker on an Amazon Linux EC2 instance, I installed npm with the following command:
sudo yum install nodejs npm --enablerepo=epel
For my custom worker, I used pip to install the required Python libraries:
pip install boto3 requests
For a full Python package list, see requirements.txt in the GitHub repository.
Let’s take a look at some of the code snippets from the worker.
First, the worker polls for jobs:
def action_type():
ActionType = {
'category': 'Deploy',
'owner': 'Custom',
'provider': 'Artifactory',
'version': '1' }
return(ActionType)
def poll_for_jobs():
try:
artifactory_action_type = action_type()
print(artifactory_action_type)
jobs = codepipeline.poll_for_jobs(actionTypeId=artifactory_action_type)
while not jobs['jobs']:
time.sleep(10)
jobs = codepipeline.poll_for_jobs(actionTypeId=artifactory_action_type)
if jobs['jobs']:
print('Job found')
return jobs['jobs'][0]
except ClientError as e:
print("Received an error: %s" % str(e))
raise
When there is a job in the queue, the poller returns a number of values from the queue such as jobId, the input and output S3 buckets for artifacts, temporary credentials to access the S3 buckets, and other configuration details from the stage in the pipeline.
After successfully receiving the job details, the worker sends an acknowledgement to CodePipeline to ensure that the work on the job is not duplicated by other workers watching for the same job:
def job_acknowledge(jobId, nonce):
try:
print('Acknowledging job')
result = codepipeline.acknowledge_job(jobId=jobId, nonce=nonce)
return result
except Exception as e:
print("Received an error when trying to acknowledge the job: %s" % str(e))
raise
With the job now acknowledged, the worker publishes the source code artifact into the desired repository. The worker gets the value of the artifact S3 bucket and objectKey from the inputArtifacts in the response from the poll_for_jobs API request. Next, the worker creates a new directory in /tmp and downloads the S3 object into this directory:
def get_bucket_location(bucketName, init_client):
region = init_client.get_bucket_location(Bucket=bucketName)['LocationConstraint']
if not region:
region = 'us-east-1'
return region
def get_s3_artifact(bucketName, objectKey, ak, sk, st):
init_s3 = boto3.client('s3')
region = get_bucket_location(bucketName, init_s3)
session = Session(aws_access_key_id=ak,
aws_secret_access_key=sk,
aws_session_token=st)
s3 = session.resource('s3',
region_name=region,
config=botocore.client.Config(signature_version='s3v4'))
try:
tempdirname = tempfile.mkdtemp()
except OSError as e:
print('Could not write temp directory %s' % tempdirname)
raise
bucket = s3.Bucket(bucketName)
obj = bucket.Object(objectKey)
filename = tempdirname + '/' + objectKey
try:
if os.path.dirname(objectKey):
directory = os.path.dirname(filename)
os.makedirs(directory)
print('Downloading the %s object and writing it to disk in %s location' % (objectKey, tempdirname))
with open(filename, 'wb') as data:
obj.download_fileobj(data)
except ClientError as e:
print('Downloading the object and writing the file to disk raised this error: ' + str(e))
raise
return(filename, tempdirname)
Because the downloaded artifact from S3 is a zip file, the worker must unzip it first. To have a clean area in which to work, I extract the downloaded zip archive into a new directory:
def unzip_codepipeline_artifact(artifact, origtmpdir):
# create a new temp directory
# Unzip artifact into new directory
try:
newtempdir = tempfile.mkdtemp()
print('Extracting artifact %s into temporary directory %s' % (artifact, newtempdir))
zip_ref = zipfile.ZipFile(artifact, 'r')
zip_ref.extractall(newtempdir)
zip_ref.close()
shutil.rmtree(origtmpdir)
return(os.listdir(newtempdir), newtempdir)
except OSError as e:
if e.errno != errno.EEXIST:
shutil.rmtree(newtempdir)
raise
The worker now has the npm package that I want to store in my Artifactory NPM repository.
To authenticate with the NPM repository, the worker requests a temporary token from the Artifactory host. After receiving this temporary token, it creates a .npmrc file in the worker user’s home directory that includes a hash of the user name and temporary token. After it has authenticated, the worker runs npm config set registry <URL OF REPOSITORY> to configure the npm registry value to be the Artifactory host. Next, the worker runs npm publish –registry <URL OF REPOSITORY>, which publishes the node package to the NPM repository in the Artifactory host.
def push_to_npm(configuration, artifact_list, temp_dir, jobId):
reponame = configuration['RepoKey']
art_type = configuration['TypeOfArtifact']
print("Putting artifact into NPM repository " + reponame)
token, hostname, username = gen_artifactory_auth_token(configuration)
npmconfigfile = create_npmconfig_file(configuration, username, token)
url = hostname + '/artifactory/api/' + art_type + '/' + reponame
print("Changing directory to " + str(temp_dir))
os.chdir(temp_dir)
try:
print("Publishing following files to the repository: %s " % os.listdir(temp_dir))
print("Sending artifact to Artifactory NPM registry URL: " + url)
subprocess.call(["npm", "config", "set", "registry", url])
req = subprocess.call(["npm", "publish", "--registry", url])
print("Return code from npm publish: " + str(req))
if req != 0:
err_msg = "npm ERR! Recieved non OK response while sending response to Artifactory. Return code from npm publish: " + str(req)
signal_failure(jobId, err_msg)
else:
signal_success(jobId)
except requests.exceptions.RequestException as e:
print("Received an error when trying to commit artifact %s to repository %s: " % (str(art_type), str(configuration['RepoKey']), str(e)))
raise
return(req, npmconfigfile)
If the return value from publishing to the repository is not 0, the worker signals a failure to CodePipeline. If the value is 0, the worker signals success to CodePipeline to indicate that the stage of the pipeline has been completed successfully.
For the custom worker code, see npm_job_worker.py in the GitHub repository.
I run my custom worker on an EC2 instance using the command python npm_job_worker.py, with an optional --version flag that can be used to specify worker versions other than 1. Then I trigger a release change in my pipeline:
From my custom worker output logs, I have just committed a package named node_example at version 1.0.3:
On artifact: index.js
Committing to the repo: https://artifactory.myexamplehost.com/artifactory/api/npm/npm
Sending artifact to Artifactory URL: https:// artifactoryhost.myexamplehost.com/artifactory/api/npm/npm
npm config: 0
npm http PUT https://artifactory.myexamplehost.com/artifactory/api/npm/npm/node_example
npm http 201 https://artifactory.myexamplehost.com/artifactory/api/npm/npm/node_example
+ [email protected]
Return code from npm publish: 0
Signaling success to CodePipeline
After that has been built successfully, I can find my artifact in my Artifactory repository:
To help you automate this process, I have created this AWS CloudFormation template that automates the creation of the CodeBuild project, the custom action, and the CodePipeline pipeline. It also launches the Amazon EC2-based custom job worker in an AWS Auto Scaling group. This template requires you to have a VPC and CodeCommit repository for your Node.js project. If you do not currently have a VPC in which you want to run your custom worker EC2 instances, you can use this AWS QuickStart to create one. If you do not have an existing Node.js project, I’ve provided a sample project in the GitHub repository.
Conclusion
I‘ve shown you the steps to integrate your JFrog Artifactory repository with your CodePipeline workflow. I’ve shown you how to create a custom action in CodePipeline and how to create a custom worker that works in your CI/CD pipeline. To dig deeper into custom actions and see how you can integrate your Artifactory repositories into your AWS CodePipeline projects, check out the full code base on GitHub.
If you have any questions or feedback, feel free to reach out to us through the AWS CodePipeline forum.
Erin McGill is a Solutions Architect in the AWS Partner Program with a focus on DevOps and automation tooling.
Slack is widely used by DevOps and development teams to communicate status. Typically, when a build has been tested and is ready to be promoted to a staging environment, a QA engineer or DevOps engineer kicks off the deployment. Using Slack in a ChatOps collaboration model, the promotion can be done in a single click from a Slack channel. And because the promotion happens through a Slack channel, the whole development team knows what’s happening without checking email.
In this blog post, I will show you how to integrate AWS services with a Slack application. I use an interactive message button and incoming webhook to promote a stage with a single click.
To follow along with the steps in this post, you’ll need a pipeline in AWS CodePipeline. If you don’t have a pipeline, the fastest way to create one for this use case is to use AWS CodeStar. Go to the AWS CodeStar console and select the Static Website template (shown in the screenshot). AWS CodeStar will create a pipeline with an AWS CodeCommit repository and an AWS CodeDeploy deployment for you. After the pipeline is created, you will need to add a manual approval stage.
You’ll also need to build a Slack app with webhooks and interactive components, write two Lambda functions, and create an API Gateway API and a SNS topic.
As you’ll see in the following diagram, when I make a change and merge a new feature into the master branch in AWS CodeCommit, the check-in kicks off my CI/CD pipeline in AWS CodePipeline. When CodePipeline reaches the approval stage, it sends a notification to Amazon SNS, which triggers an AWS Lambda function (ApprovalRequester).
The Slack channel receives a prompt that looks like the following screenshot. When I click Yes to approve the build promotion, the approval result is sent to CodePipeline through API Gateway and Lambda (ApprovalHandler). The pipeline continues on to deploy the build to the next environment.
Create a Slack app
For App Name, type a name for your app. For Development Slack Workspace, choose the name of your workspace. You’ll see in the following screenshot that my workspace is AWS ChatOps.
After the Slack application has been created, you will see the Basic Information page, where you can create incoming webhooks and enable interactive components.
To add incoming webhooks:
Under Add features and functionality, choose Incoming Webhooks. Turn the feature on by selecting Off, as shown in the following screenshot.
Now that the feature is turned on, choose Add New Webhook to Workspace. In the process of creating the webhook, Slack lets you choose the channel where messages will be posted.
After the webhook has been created, you’ll see its URL. You will use this URL when you create the Lambda function.
If you followed the steps in the post, the pipeline should look like the following.
Write the Lambda function for approval requests
This Lambda function is invoked by the SNS notification. It sends a request that consists of an interactive message button to the incoming webhook you created earlier. The following sample code sends the request to the incoming webhook. WEBHOOK_URL and SLACK_CHANNEL are the environment variables that hold values of the webhook URL that you created and the Slack channel where you want the interactive message button to appear.
# This function is invoked via SNS when the CodePipeline manual approval action starts.
# It will take the details from this approval notification and sent an interactive message to Slack that allows users to approve or cancel the deployment.
import os
import json
import logging
import urllib.parse
from base64 import b64decode
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
# This is passed as a plain-text environment variable for ease of demonstration.
# Consider encrypting the value with KMS or use an encrypted parameter in Parameter Store for production deployments.
SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL']
SLACK_CHANNEL = os.environ['SLACK_CHANNEL']
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
message = event["Records"][0]["Sns"]["Message"]
data = json.loads(message)
token = data["approval"]["token"]
codepipeline_name = data["approval"]["pipelineName"]
slack_message = {
"channel": SLACK_CHANNEL,
"text": "Would you like to promote the build to production?",
"attachments": [
{
"text": "Yes to deploy your build to production",
"fallback": "You are unable to promote a build",
"callback_id": "wopr_game",
"color": "#3AA3E3",
"attachment_type": "default",
"actions": [
{
"name": "deployment",
"text": "Yes",
"style": "danger",
"type": "button",
"value": json.dumps({"approve": True, "codePipelineToken": token, "codePipelineName": codepipeline_name}),
"confirm": {
"title": "Are you sure?",
"text": "This will deploy the build to production",
"ok_text": "Yes",
"dismiss_text": "No"
}
},
{
"name": "deployment",
"text": "No",
"type": "button",
"value": json.dumps({"approve": False, "codePipelineToken": token, "codePipelineName": codepipeline_name})
}
]
}
]
}
req = Request(SLACK_WEBHOOK_URL, json.dumps(slack_message).encode('utf-8'))
response = urlopen(req)
response.read()
return None
Create a SNS topic
Create a topic and then create a subscription that invokes the ApprovalRequester Lambda function. You can configure the manual approval action in the pipeline to send a message to this SNS topic when an approval action is required. When the pipeline reaches the approval stage, it sends a notification to this SNS topic. SNS publishes a notification to all of the subscribed endpoints. In this case, the Lambda function is the endpoint. Therefore, it invokes and executes the Lambda function. For information about how to create a SNS topic, see Create a Topic in the Amazon SNS Developer Guide.
Write the Lambda function for handling the interactive message button
This Lambda function is invoked by API Gateway. It receives the result of the interactive message button whether or not the build promotion was approved. If approved, an API call is made to CodePipeline to promote the build to the next environment. If not approved, the pipeline stops and does not move to the next stage.
The Lambda function code might look like the following. SLACK_VERIFICATION_TOKEN is the environment variable that contains your Slack verification token. You can find your verification token under Basic Information on Slack manage app page. When you scroll down, you will see App Credential. Verification token is found under the section.
# This function is triggered via API Gateway when a user acts on the Slack interactive message sent by approval_requester.py.
from urllib.parse import parse_qs
import json
import os
import boto3
SLACK_VERIFICATION_TOKEN = os.environ['SLACK_VERIFICATION_TOKEN']
#Triggered by API Gateway
#It kicks off a particular CodePipeline project
def lambda_handler(event, context):
#print("Received event: " + json.dumps(event, indent=2))
body = parse_qs(event['body'])
payload = json.loads(body['payload'][0])
# Validate Slack token
if SLACK_VERIFICATION_TOKEN == payload['token']:
send_slack_message(json.loads(payload['actions'][0]['value']))
# This will replace the interactive message with a simple text response.
# You can implement a more complex message update if you would like.
return {
"isBase64Encoded": "false",
"statusCode": 200,
"body": "{\"text\": \"The approval has been processed\"}"
}
else:
return {
"isBase64Encoded": "false",
"statusCode": 403,
"body": "{\"error\": \"This request does not include a vailid verification token.\"}"
}
def send_slack_message(action_details):
codepipeline_status = "Approved" if action_details["approve"] else "Rejected"
codepipeline_name = action_details["codePipelineName"]
token = action_details["codePipelineToken"]
client = boto3.client('codepipeline')
response_approval = client.put_approval_result(
pipelineName=codepipeline_name,
stageName='Approval',
actionName='ApprovalOrDeny',
result={'summary':'','status':codepipeline_status},
token=token)
print(response_approval)
Create the API Gateway API
In the Amazon API Gateway console, create a resource called InteractiveMessageHandler.
Create a POST method.
For Integration type, choose Lambda Function.
Select Use Lambda Proxy integration.
From Lambda Region, choose a region.
In Lambda Function, type a name for your function.
Now go back to your Slack application and enable interactive components.
To enable interactive components for the interactive message (Yes) button:
Under Features, choose Interactive Components.
Choose Enable Interactive Components.
Type a request URL in the text box. Use the invoke URL in Amazon API Gateway that will be called when the approval button is clicked.
Now that all the pieces have been created, run the solution by checking in a code change to your CodeCommit repo. That will release the change through CodePipeline. When the CodePipeline comes to the approval stage, it will prompt to your Slack channel to see if you want to promote the build to your staging or production environment. Choose Yes and then see if your change was deployed to the environment.
Conclusion
That is it! You have now created a Slack ChatOps solution using AWS CodeCommit, AWS CodePipeline, AWS Lambda, Amazon API Gateway, and Amazon Simple Notification Service.
Now that you know how to do this Slack and CodePipeline integration, you can use the same method to interact with other AWS services using API Gateway and Lambda. You can also use Slack’s slash command to initiate an action from a Slack channel, rather than responding in the way demonstrated in this post.
Earlier this year on 3 and 4 March, communities around the world held Raspberry Jam events to celebrate Raspberry Pi’s sixth birthday. We sent out special birthday kits to participating Jams — it was amazing to know the kits would end up in the hands of people in parts of the world very far from Raspberry Pi HQ in Cambridge, UK.
The Raspberry Jam Camer team: Damien Doumer, Eyong Etta, Loïc Dessap and Lionel Sichom, aka Lionel Tellem
Preparing for the #PiParty
One birthday kit went to Yaoundé, the capital of Cameroon. There, a team of four students in their twenties — Lionel Sichom (aka Lionel Tellem), Eyong Etta, Loïc Dessap, and Damien Doumer — were organising Yaoundé’s first Jam, called Raspberry Jam Camer, as part of the Raspberry Jam Big Birthday Weekend. The team knew one another through their shared interests and skills in electronics, robotics, and programming. Damien explains in his blog post about the Jam that they planned ahead for several activities for the Jam based on their own projects, so they could be confident of having a few things that would definitely be successful for attendees to do and see.
Show-and-tell at Raspberry Jam Cameroon
Loïc presented a Raspberry Pi–based, Android app–controlled robot arm that he had built, and Lionel coded a small video game using Scratch on Raspberry Pi while the audience watched. Damien demonstrated the possibilities of Windows 10 IoT Core on Raspberry Pi, showing how to install it, how to use it remotely, and what you can do with it, including building a simple application.
Loïc showcases the prototype robot arm he built
There was lots more too, with others discussing their own Pi projects and talking about the possibilities Raspberry Pi offers, including a Pi-controlled drone and car. Cake was a prevailing theme of the Raspberry Jam Big Birthday Weekend around the world, and Raspberry Jam Camer made sure they didn’t miss out.
Yay, birthday cake!!
A big success
Most visitors to the Jam were secondary school students, while others were university students and graduates. The majority were unfamiliar with Raspberry Pi, but all wanted to learn about Raspberry Pi and what they could do with it. Damien comments that the fact most people were new to Raspberry Pi made the event more interactive rather than creating any challenges, because the visitors were all interested in finding out about the little computer. The Jam was an all-round success, and the team was pleased with how it went:
What I liked the most was that we sensitized several people about the Raspberry Pi and what one can be capable of with such a small but powerful device. — Damien Doumer
The Jam team rounded off the event by announcing that this was the start of a Raspberry Pi community in Yaoundé. They hope that they and others will be able to organise more Jams and similar events in the area to spread the word about what people can do with Raspberry Pi, and to help them realise their ideas.
Raspberry Jam Camer gets the thumbs-up
The Raspberry Pi community in Cameroon
In a French-language interview about their Jam, the team behind Raspberry Jam Camer said they’d like programming to become the third official language of Cameroon, after French and English; their aim is to to popularise programming and digital making across Cameroonian society. Neither of these fields is very familiar to most people in Cameroon, but both are very well aligned with the country’s ambitions for development. The team is conscious of the difficulties around the emergence of information and communication technologies in the Cameroonian context; in response, they are seizing the opportunities Raspberry Pi offers to give children and young people access to modern and constantly evolving technology at low cost.
Thanks to Lionel, Eyong, Damien, and Loïc, and to everyone who helped put on a Jam for the Big Birthday Weekend! Remember, anyone can start a Jam at any time — and we provide plenty of resources to get you started. Check out the Guidebook, the Jam branding pack, our specially-made Jam activities online (in multiplelanguages), printable worksheets, and more.
Thanks to Susan Ferrell, Senior Technical Writer, for a great blog post on how to use CodeCommit branch-level permissions. —-
AWS CodeCommit users have been asking for a way to restrict commits to some repository branches to just a few people. In this blog post, we’re going to show you how to do that by creating and applying a conditional policy, an AWS Identity and Access Management (IAM) policy that contains a context key.
Why would I do this?
When you create a branch in an AWS CodeCommit repository, the branch is available, by default, to all repository users. Here are some scenarios in which refining access might help you:
You maintain a branch in a repository for production-ready code, and you don’t want to allow changes to this branch except from a select group of people.
You want to limit the number of people who can make changes to the default branch in a repository.
You want to ensure that pull requests cannot be merged to a branch except by an approved group of developers.
We’ll show you how to create a policy in IAM that prevents users from pushing commits to and merging pull requests to a branch named master. You’ll attach that policy to one group or role in IAM, and then test how users in that group are affected when that policy is applied. We’ll explain how it works, so you can create custom policies for your repositories.
What you need to get started
You’ll need to sign in to AWS with sufficient permissions to:
Create and apply policies in IAM.
Create groups in IAM.
Add users to those groups.
Apply policies to those groups.
You can use existing IAM groups, but because you’re going to be changing permissions, you might want to first test this out on groups and users you’ve created specifically for this purpose.
You’ll need a repository in AWS CodeCommit with at least two branches: master and test-branch. For information about how to create repositories, see Create a Repository. For information about how to create branches, see Create a Branch. In this blog post, we’ve named the repository MyDemoRepo. You can use an existing repository with branches of another name, if you prefer.
Let’s get started!
Create two groups in IAM
We’re going to set up two groups in IAM: Developers and Senior_Developers. To start, both groups will have the same managed policy, AWSCodeCommitPowerUsers, applied. Users in each group will have exactly the same permissions to perform actions in IAM.
Figure 1: Two example groups in IAM, with distinct users but the same managed policy applied to each group
In the navigation pane, choose Groups, and then choose Create New Group.
In the Group Name box, type Developers, and then choose Next Step.
In the list of policies, select the check box for AWSCodeCommitPowerUsers, then choose Next Step.
Choose Create Group.
Now, follow these steps to create the Senior_Developers group and attach the AWSCodeCommitPowerUsers managed policy. You now have two empty groups with the same policy attached.
Create users in IAM
Next, add at least one unique user to each group. You can use existing IAM users, but because you’ll be affecting their access to AWS CodeCommit, you might want to create two users just for testing purposes. Let’s go ahead and create Arnav and Mary.
In the navigation pane, choose Users, and then choose Add user.
For the new user, type Arnav_Desai.
Choose Add another user, and then type Mary_Major.
Select the type of access (programmatic access, access to the AWS Management Console, or both). In this blog post, we’ll be testing everything from the console, but if you want to test AWS CodeCommit using the AWS CLI, make sure you include programmatic access and console access.
For Console password type, choose Custom password. Each user is assigned the password that you type in the box. Write these down so you don’t forget them. You’ll need to sign in to the console using each of these accounts.
Choose Next: Permissions.
On the Set permissions page, choose Add user to group. Add Arnav to the Developers group. Add Mary to the Senior_Developers group.
Choose Next: Review to see all of the choices you made up to this point. When you are ready to proceed, choose Create user.
Sign in as Arnav, and then follow these steps to go to the master branch and add a file. Then sign in as Mary and follow the same steps.
On the Dashboard page, from the list of repositories, choose MyDemoRepo.
In the Code view, choose the branch named master.
Choose Add file, and then choose Create file. Type some text or code in the editor.
Provide information to other users about who added this file to the repository and why.
In Author name, type the name of the user (Arnav or Mary).
In Email address, type an email address so that other repository users can contact you about this change.
In Commit message, type a brief description to help you remember why you added this file or any other details you might find helpful.
Type a name for the file.
Choose Commit file.
Now follow the same steps to add a file in a different branch. (In our example repository, that’s the branch named test-branch.) You should be able to add a file to both branches regardless of whether you’re signed in as Arnav or Mary.
Let’s change that.
Create a conditional policy in IAM
You’re going to create a policy in IAM that will deny API actions if certain conditions are met. We want to prevent users with this policy applied from updating a branch named master, but we don’t want to prevent them from viewing the branch, cloning the repository, or creating pull requests that will merge to that branch. For this reason, we want to pick and choose our APIs carefully. Looking at the Permissions Reference, the logical permissions for this are:
GitPush
PutFile
MergePullRequestByFastForward
Now’s the time to think about what else you might want this policy to do. For example, because we don’t want users with this policy to make changes to this branch, we probably don’t want them to be able to delete it either, right? So let’s add one more permission:
DeleteBranch
The branch in which we want to deny these actions is master. The repository in which the branch resides is MyDemoRepo. We’re going to need more than just the repository name, though. We need the repository ARN. Fortunately, that’s easy to find. Just go to the AWS CodeCommit console, choose the repository, and choose Settings. The repository ARN is displayed on the General tab.
Now we’re ready to create a policy. 1. Open the IAM console at https://console.aws.amazon.com/iam/. Make sure you’re signed in with the account that has sufficient permissions to create policies, and not as Arnav or Mary. 2. In the navigation pane, choose Policies, and then choose Create policy. 3. Choose JSON, and then paste in the following:
You’ll notice a few things here. First, change the repository ARN to the ARN for your repository and include the repository name. Second, if you want to restrict access to a branch with a name different from our example, master, change that reference too.
Now let’s talk about this policy and what it does. You might be wondering why we’re using a Git reference (refs/heads) value instead of just the branch name. The answer lies in how Git references things, and how AWS CodeCommit, as a Git-based repository service, implements its APIs. A branch in Git is a simple pointer (reference) to the SHA-1 value of the head commit for that branch.
You might also be wondering about the second part of the condition, the nullification language. This is necessary because of the way git push and git-receive-pack work. Without going into too many technical details, when you attempt to push a change from a local repo to AWS CodeCommit, an initial reference call is made to AWS CodeCommit without any branch information. AWS CodeCommit evaluates that initial call to ensure that:
a) You’re authorized to make calls.
b) A repository exists with the name specified in the initial call. If you left that null out of the policy, users with that policy would be unable to complete any pushes from their local repos to the AWS CodeCommit remote repository at all, regardless of which branch they were trying to push their commits to.
Could you write a policy in such a way that the null is not required? Of course. IAM policy language is flexible. There’s an example of how to do this in the AWS CodeCommit User Guide, if you’re curious. But for the purposes of this blog post, let’s continue with this policy as written.
So what have we essentially said in this policy? We’ve asked IAM to deny the relevant CodeCommit permissions if the request is made to the resource MyDemoRepo and it meets the following condition: the reference is to refs/heads/master. Otherwise, the deny does not apply.
I’m sure you’re wondering if this policy has to be constrained to a specific repository resource like MyDemoRepo. After all, it would be awfully convenient if a single policy could apply to all branches in any repository in an AWS account, particularly since the default branch in any repository is initially the master branch. Good news! Simply replace the ARN with an *, and your policy will affect ALL branches named master in every AWS CodeCommit repository in your AWS account. Make sure that this is really what you want, though. We suggest you start by limiting the scope to just one repository, and then changing things when you’ve tested it and are happy with how it works.
When you’re sure you’ve modified the policy for your environment, choose Review policy to validate it. Give this policy a name, such as DenyChangesToMaster, provide a description of its purpose, and then choose Create policy.
Now that you have a policy, it’s time to apply and test it.
Apply the policy to a group
In theory, you could apply the policy you just created directly to any IAM user, but that really doesn’t scale well. You should apply this policy to a group, if you use IAM groups to manage users, or to a role, if your users assume a role when interacting with AWS resources.
In the IAM console, choose Groups, and then choose Developers.
On the Permissions tab, choose Attach Policy.
Choose DenyChangesToMaster, and then choose Attach policy.
Your groups now have a critical difference: users in the Developers group have an additional policy applied that restricts their actions in the master branch. In other words, Mary can continue to add files, push commits, and merge pull requests in the master branch, but Arnav cannot.
Figure 2: Two example groups in IAM, one with an additional policy applied that will prevent users in this group from making changes to the master branch
Test it out. Sign in as Arnav, and do the following:
On the Dashboard page, from the list of repositories, choose MyDemoRepo.
In the Code view, choose the branch named master.
Choose Add file, and then choose Create file, just as you did before. Provide some text, and then add the file name and your user information.
Choose Commit file.
This time you’ll see an error after choosing Commit file. It’s not a pretty message, but at the very end, you’ll see a telling phrase: “explicit deny”. That’s the policy in action. You, as Arnav, are explicitly denied PutFile, which prevents you from adding a file to the master branch. You’ll see similar results if you try other actions denied by that policy, such as deleting the master branch.
Stay signed in as Arnav, but this time add a file to test-branch. You should be able to add a file without seeing any errors. You can create a branch based on the master branch, add a file to it, and create a pull request that will merge to the master branch, all just as before. However, you cannot perform denied actions on that master branch.
Sign out as Arnav and sign in as Mary. You’ll see that as that IAM user, you can add and edit files in the master branch, merge pull requests to it, and even, although we don’t recommend this, delete it.
Conclusion
You can use conditional statements in policies in IAM to refine how users interact with your AWS CodeCommit repositories. This blog post showed how to use such a policy to prevent users from making changes to a branch named master. There are many other options. We hope this blog post will encourage you to experiment with AWS CodeCommit, IAM policies, and permissions. If you have any questions or suggestions, we’d love to hear from you.
The EU’s General Data Protection Regulation (GDPR) describes data processor and data controller roles, and some customers and AWS Partner Network (APN) partners are asking how this affects the long-established AWS Shared Responsibility Model. I wanted to take some time to help folks understand shared responsibilities for us and for our customers in context of the GDPR.
How does the AWS Shared Responsibility Model change under GDPR? The short answer – it doesn’t. AWS is responsible for securing the underlying infrastructure that supports the cloud and the services provided; while customers and APN partners, acting either as data controllers or data processors, are responsible for any personal data they put in the cloud. The shared responsibility model illustrates the various responsibilities of AWS and our customers and APN partners, and the same separation of responsibility applies under the GDPR.
AWS responsibilities as a data processor
The GDPR does introduce specific regulation and responsibilities regarding data controllers and processors. When any AWS customer uses our services to process personal data, the controller is usually the AWS customer (and sometimes it is the AWS customer’s customer). However, in all of these cases, AWS is always the data processor in relation to this activity. This is because the customer is directing the processing of data through its interaction with the AWS service controls, and AWS is only executing customer directions. As a data processor, AWS is responsible for protecting the global infrastructure that runs all of our services. Controllers using AWS maintain control over data hosted on this infrastructure, including the security configuration controls for handling end-user content and personal data. Protecting this infrastructure, is our number one priority, and we invest heavily in third-party auditors to test our security controls and make any issues they find available to our customer base through AWS Artifact. Our ISO 27018 report is a good example, as it tests security controls that focus on protection of personal data in particular.
AWS has an increased responsibility for our managed services. Examples of managed services include Amazon DynamoDB, Amazon RDS, Amazon Redshift, Amazon Elastic MapReduce, and Amazon WorkSpaces. These services provide the scalability and flexibility of cloud-based resources with less operational overhead because we handle basic security tasks like guest operating system (OS) and database patching, firewall configuration, and disaster recovery. For most managed services, you only configure logical access controls and protect account credentials, while maintaining control and responsibility of any personal data.
Customer and APN partner responsibilities as data controllers — and how AWS Services can help
Our customers can act as data controllers or data processors within their AWS environment. As a data controller, the services you use may determine how you configure those services to help meet your GDPR compliance needs. For example, AWS Services that are classified as Infrastructure as a Service (IaaS), such as Amazon EC2, Amazon VPC, and Amazon S3, are under your control and require you to perform all routine security configuration and management that would be necessary no matter where the servers were located. With Amazon EC2 instances, you are responsible for managing: guest OS (including updates and security patches), application software or utilities installed on the instances, and the configuration of the AWS-provided firewall (called a security group).
To help you realize data protection by design principles under the GDPR when using our infrastructure, we recommend you protect AWS account credentials and set up individual user accounts with Amazon Identity and Access Management (IAM) so that each user is only given the permissions necessary to fulfill their job duties. We also recommend using multi-factor authentication (MFA) with each account, requiring the use of SSL/TLS to communicate with AWS resources, setting up API/user activity logging with AWS CloudTrail, and using AWS encryption solutions, along with all default security controls within AWS Services. You can also use advanced managed security services, such as Amazon Macie, which assists in discovering and securing personal data stored in Amazon S3.
For more information, you can download the AWS Security Best Practices whitepaper or visit the AWS Security Resources or GDPR Center webpages. In addition to our solutions and services, AWS APN partners can provide hundreds of tools and features to help you meet your security objectives, ranging from network security and configuration management to access control and data encryption.
If you’ve used Amazon CloudWatch Events to schedule the invocation of a Lambda function at regular intervals, you may have noticed that the highest frequency possible is one invocation per minute. However, in some cases, you may need to invoke Lambda more often than that. In this blog post, I’ll cover invoking a Lambda function every 10 seconds, but with some simple math you can change to whatever interval you like.
To achieve this, I’ll show you how to leverage Step Functions and Amazon Kinesis Data Streams.
The Solution
For this example, I’ve created a Step Functions State Machine that invokes our Lambda function 6 times, 10 seconds apart. Such State Machine is then executed once per minute by a CloudWatch Events Rule. This state machine is then executed once per minute by an Amazon CloudWatch Events rule. Finally, the Kinesis Data Stream triggers our Lambda function for each record inserted. The result is our Lambda function being invoked every 10 seconds, indefinitely.
Below is a diagram illustrating how the various services work together.
Step 1: My sampleLambda function doesn’t actually do anything, it just simulates an execution for a few seconds. This is the (Python) code of my dummy function:
import time
import random
def lambda_handler(event, context):
rand = random.randint(1, 3)
print('Running for {} seconds'.format(rand))
time.sleep(rand)
return True
Step 2:
The next step is to create a second Lambda function, that I called Iterator, which has two duties:
It keeps track of the current number of iterations, since Step Function doesn’t natively have a state we can use for this purpose.
It asynchronously invokes our Lambda function at every loops.
This is the code of the Iterator, adapted from here.
The state machine starts and sets the index at 0 and the count at 6.
Iterator function is invoked.
If the iterator function reached the end of the loop, the IsCountReached state terminates the execution, otherwise the machine waits for 10 seconds.
The machine loops back to the iterator.
Step 3: Create an Amazon CloudWatch Events rule scheduled to trigger every minute and add the state machine as its target. I’ve actually prepared an Amazon CloudFormation template that creates the whole stack and starts the Lambda invocations, you can find it here.
Performance
Let’s have a look at a sample series of invocations and analyse how precise the timing is. In the following chart I reported the delay (in excess of the expected 10-second-wait) of 30 consecutive invocations of my dummy function, when the Iterator is configured with a memory size of 1024MB.
Invocations Delay
Notice the delay increases by a few hundred milliseconds at every invocation. The good news is it accrues only within the same loop, 6 times; after that, a new CloudWatch Events kicks in and it resets.
This delay is due to the work that AWS Step Function does outside of the Wait state, the main component of which is the Iterator function itself, that runs synchronously in the state machine and therefore adds up its duration to the 10-second-wait.
As we can easily imagine, the memory size of the Iterator Lambda function does make a difference. Here are the Average and Maximum duration of the function with 256MB, 512MB, 1GB and 2GB of memory.
Average Duration
Maximum Duration
Given those results, I’d say that a memory of 1024MB is a good compromise between costs and performance.
Caveats
As mentioned, in our Amazon CloudWatch Events documentation, in rare cases a rule can be triggered twice, causing two parallel executions of the state machine. If that is a concern, we can add a task state at the beginning of the state machine that checks if any other executions are currently running. If the outcome is positive, then a choice state can immediately terminate the flow. Since the state machine is invoked every 60 seconds and runs for about 50, it is safe to assume that executions should all be sequential and any parallel executions should be treated as duplicates. The task state that checks for current running executions can be a Lambda function similar to the following:
AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easier to prepare and load your data for analytics. You can create and run an ETL job with a few clicks on the AWS Management Console. Just point AWS Glue to your data store. AWS Glue discovers your data and stores the associated metadata (for example, a table definition and schema) in the AWS Glue Data Catalog.
AWS Glue has native connectors to data sources using JDBC drivers, either on AWS or elsewhere, as long as there is IP connectivity. In this post, we demonstrate how to connect to data sources that are not natively supported in AWS Glue today. We walk through connecting to and running ETL jobs against two such data sources, IBM DB2 and SAP Sybase. However, you can use the same process with any other JDBC-accessible database.
AWS Glue data sources
AWS Glue natively supports the following data stores by using the JDBC protocol:
One of the fastest growing architectures deployed on AWS is the data lake. The ETL processes that are used to ingest, clean, transform, and structure data are critically important for this architecture. Having the flexibility to interoperate with a broader range of database engines allows for a quicker adoption of the data lake architecture.
For data sources that AWS Glue doesn’t natively support, such as IBM DB2, Pivotal Greenplum, SAP Sybase, or any other relational database management system (RDBMS), you can import custom database connectors from Amazon S3 into AWS Glue jobs. In this case, the connection to the data source must be made from the AWS Glue script to extract the data, rather than using AWS Glue connections. To learn more, see Providing Your Own Custom Scripts in the AWS Glue Developer Guide.
Setting up an ETL job for an IBM DB2 data source
The first example demonstrates how to connect the AWS Glue ETL job to an IBM DB2 instance, transform the data from the source, and store it in Apache Parquet format in Amazon S3. To successfully create the ETL job using an external JDBC driver, you must define the following:
The S3 location of the job script
The S3 location of the temporary directory
The S3 location of the JDBC driver
The S3 location of the Parquet data (output)
The IAM role for the job
By default, AWS Glue suggests bucket names for the scripts and the temporary directory using the following format:
Keep in mind that having the AWS Glue job and S3 buckets in the same AWS Region helps save on cross-Region data transfer fees. For this post, we will work in the US East (Ohio) Region (us-east-2).
Creating the IAM role
The next step is to set up the IAM role that the ETL job will use:
Sign in to the AWS Management Console, and search for IAM:
On the IAM console, choose Roles in the left navigation pane.
Choose Create role. The role type of trusted entity must be an AWS service, specifically AWS Glue.
Choose Next: Permissions.
Search for the AWSGlueServiceRole policy, and select it.
Search again, now for the SecretsManagerReadWrite This policy allows the AWS Glue job to access database credentials that are stored in AWS Secrets Manager.
CAUTION: This policy is open and is being used for testing purposes only. You should create a custom policy to narrow the access just to the secrets that you want to use in the ETL job.
Select this policy, and choose Next: Review.
Give your role a name, for example, GluePermissions, and confirm that both policies were selected.
Choose Create role.
Now that you have created the IAM role, it’s time to upload the JDBC driver to the defined location in Amazon S3. For this example, we will use the DB2 driver, which is available on the IBM Support site.
Storing database credentials
It is a best practice to store database credentials in a safe store. In this case, we use AWS Secrets Manager to securely store credentials. Follow these steps to create those credentials:
Open the console, and search for Secrets Manager.
In the AWS Secrets Manager console, choose Store a new secret.
Under Select a secret type, choose Other type of secrets.
In the Secret key/value, set one row for each of the following parameters:
db_username
db_password
db_url (for example, jdbc:db2://10.10.12.12:50000/SAMPLE)
db_table
driver_name (ibm.db2.jcc.DB2Driver)
output_bucket: (for example, aws-glue-data-output-1234567890-us-east-2/User)
Choose Next.
For Secret name, use DB2_Database_Connection_Info.
Choose Next.
Keep the Disable automatic rotation check box selected.
Choose Next.
Choose Store.
Adding a job in AWS Glue
The next step is to author the AWS Glue job, following these steps:
In the AWS Management Console, search for AWS Glue.
In the navigation pane on the left, choose Jobs under the ETL
Choose Add job.
Fill in the basic Job properties:
Give the job a name (for example, db2-job).
Choose the IAM role that you created previously (GluePermissions).
For This job runs, choose A new script to be authored by you.
For ETL language, choose Python.
In the Script libraries and job parameters section, choose the location of your JDBC driver for Dependent jars path.
Choose Next.
On the Connections page, choose Next
On the summary page, choose Save job and edit script. This creates the job and opens the script editor.
In the editor, replace the existing code with the following script. Important: Line 47 of the script corresponds to the mapping of the fields in the source table to the destination, dropping of the null fields to save space in the Parquet destination, and finally writing to Amazon S3 in Parquet format.
Choose the black X on the right side of the screen to close the editor.
Running the ETL job
Now that you have created the job, the next step is to execute it as follows:
On the Jobs page, select your new job. On the Action menu, choose Run job, and confirm that you want to run the job. Wait a few moments as it finishes the execution.
After the job shows as Succeeded, choose Logs to read the output of the job.
In the output of the job, you will find the result of executing the df.printSchema() and the message with the df.count().
Also, if you go to your output bucket in S3, you will find the Parquet result of the ETL job.
Using AWS Glue, you have created an ETL job that connects to an existing database using an external JDBC driver. It enables you to execute any transformation that you need.
Setting up an ETL job for an SAP Sybase data source
In this section, we describe how to create an AWS Glue ETL job against an SAP Sybase data source. The process mentioned in the previous section works for a Sybase data source with a few changes required in the job:
While creating the job, choose the correct jar for the JDBC dependency.
In the script, change the reference to the secret to be used from AWS Secrets Manager:
After you successfully execute the new ETL job, the output contains the same type of information that was generated with the DB2 data source.
Note that each of these JDBC drivers has its own nuances and different licensing terms that you should be aware of before using them.
Maximizing JDBC read parallelism
Something to keep in mind while working with big data sources is the memory consumption. In some cases, “Out of Memory” errors are generated when all the data is read into a single executor. One approach to optimize this is to rely on the parallelism on read that you can implement with Apache Spark and AWS Glue. To learn more, see the Apache Spark SQL module.
You can use the following options:
partitionColumn: The name of an integer column that is used for partitioning.
lowerBound: The minimum value of partitionColumn that is used to decide partition stride.
upperBound: The maximum value of partitionColumn that is used to decide partition stride.
numPartitions: The number of partitions. This, along with lowerBound (inclusive) and upperBound (exclusive), form partition strides for generated WHERE clause expressions used to split the partitionColumn When unset, this defaults to SparkContext.defaultParallelism.
Those options specify the parallelism of the table read. lowerBound and upperBound decide the partition stride, but they don’t filter the rows in the table. Therefore, Spark partitions and returns all rows in the table. For example:
It’s important to be careful with the number of partitions because too many partitions could also result in Spark crashing your external database systems.
Conclusion
Using the process described in this post, you can connect to and run AWS Glue ETL jobs against any data source that can be reached using a JDBC driver. This includes new generations of common analytical databases like Greenplum and others.
You can improve the query efficiency of these datasets by using partitioning and pushdown predicates. For more information, see Managing Partitions for ETL Output in AWS Glue. This technique opens the door to moving data and feeding data lakes in hybrid environments.
Kapil Shardha is a Technical Account Manager and supports enterprise customers with their AWS adoption. He has background in infrastructure automation and DevOps.
William Torrealba is an AWS Solutions Architect supporting customers with their AWS adoption. He has background in Application Development, High Available Distributed Systems, Automation, and DevOps.
Join us this month to learn about some of the exciting new services and solution best practices at AWS. We also have our first re:Invent 2018 webinar series, “How to re:Invent”. Sign up now to learn more, we look forward to seeing you.
Note – All sessions are free and in Pacific Time.
Tech talks featured this month:
Analytics & Big Data
May 21, 2018 | 11:00 AM – 11:45 AM PT – Integrating Amazon Elasticsearch with your DevOps Tooling – Learn how you can easily integrate Amazon Elasticsearch Service into your DevOps tooling and gain valuable insight from your log data.
May 24, 2018 | 11:00 AM – 11:45 AM PT – Data Transformation Patterns in AWS – Discover how to perform common data transformations on the AWS Data Lake.
May 30, 2018 | 01:00 PM – 01:45 PM PT – Accelerating Life Sciences with HPC on AWS – Learn how you can accelerate your Life Sciences research workloads by harnessing the power of high performance computing on AWS.
Containers
May 24, 2018 | 01:00 PM – 01:45 PM PT –Building Microservices with the 12 Factor App Pattern on AWS – Learn best practices for building containerized microservices on AWS, and how traditional software design patterns evolve in the context of containers.
Databases
May 21, 2018 | 01:00 PM – 01:45 PM PT – How to Migrate from Cassandra to Amazon DynamoDB – Get the benefits, best practices and guides on how to migrate your Cassandra databases to Amazon DynamoDB.
May 23, 2018 | 01:00 PM – 01:45 PM PT – 5 Hacks for Optimizing MySQL in the Cloud – Learn how to optimize your MySQL databases for high availability, performance, and disaster resilience using RDS.
DevOps
May 23, 2018 | 09:00 AM – 09:45 AM PT – .NET Serverless Development on AWS – Learn how to build a modern serverless application in .NET Core 2.0.
Enterprise & Hybrid
May 22, 2018 | 11:00 AM – 11:45 AM PT – Hybrid Cloud Customer Use Cases on AWS – Learn how customers are leveraging AWS hybrid cloud capabilities to easily extend their datacenter capacity, deliver new services and applications, and ensure business continuity and disaster recovery.
IoT
May 31, 2018 | 11:00 AM – 11:45 AM PT – Using AWS IoT for Industrial Applications – Discover how you can quickly onboard your fleet of connected devices, keep them secure, and build predictive analytics with AWS IoT.
Machine Learning
May 22, 2018 | 09:00 AM – 09:45 AM PT – Using Apache Spark with Amazon SageMaker – Discover how to use Apache Spark with Amazon SageMaker for training jobs and application integration.
May 24, 2018 | 09:00 AM – 09:45 AM PT – Introducing AWS DeepLens – Learn how AWS DeepLens provides a new way for developers to learn machine learning by pairing the physical device with a broad set of tutorials, examples, source code, and integration with familiar AWS services.
May 30, 2018 | 09:00 AM – 09:45 AM PT– Introducing AWS Certificate Manager Private Certificate Authority (CA) – Learn how AWS Certificate Manager (ACM) Private Certificate Authority (CA), a managed private CA service, helps you easily and securely manage the lifecycle of your private certificates.
June 1, 2018 | 09:00 AM – 09:45 AM PT – Introducing AWS Firewall Manager – Centrally configure and manage AWS WAF rules across your accounts and applications.
May 30, 2018 | 11:00 AM – 11:45 AM PT – Accelerate Productivity by Computing at the Edge – Learn how AWS Snowball Edge support for compute instances helps accelerate data transfers, execute custom applications, and reduce overall storage costs.
Bad software is everywhere. One can even claim that every software is bad. Cool companies, tech giants, established companies, all produce bad software. And no, yours is not an exception.
Who’s to blame for bad software? It’s all complicated and many factors are intertwined – there’s business requirements, there’s organizational context, there’s lack of sufficient skilled developers, there’s the inherent complexity of software development, there’s leaky abstractions, reliance on 3rd party software, consequences of wrong business and purchase decisions, time limitations, flawed business analysis, etc. So yes, despite the catchy title, I’m aware it’s actually complicated.
But in every “it’s complicated” scenario, there’s always one or two factors that are decisive. All of them contribute somehow, but the major drivers are usually a handful of things. And in the case of base software, I think it’s the fault of technical people. Developers, architects, ops.
We don’t seem to care about best practices. And I’ll do some nasty generalizations here, but bear with me. We can spend hours arguing about tabs vs spaces, curly bracket on new line, git merge vs rebase, which IDE is better, which framework is better and other largely irrelevant stuff. But we tend to ignore the important aspects that span beyond the code itself. The context in which the code lives, the non-functional requirements – robustness, security, resilience, etc.
We don’t seem to get security. Even trivial stuff such as user authentication is almost always implemented wrong. These days Twitter and GitHub realized they have been logging plain-text passwords, for example, but that’s just the tip of the iceberg. Too often we ignore the security implications.
“But the business didn’t request the security features”, one may say. The business never requested 2-factor authentication, encryption at rest, PKI, secure (or any) audit trail, log masking, crypto shredding, etc., etc. Because the business doesn’t know these things – we do and we have to put them on the backlog and fight for them to be implemented. Each organization has its specifics and tech people can influence the backlog in different ways, but almost everywhere we can put things there and prioritize them.
The other aspect is testing. We should all be well aware by now that automated testing is mandatory. We have all the tools in the world for unit, functional, integration, performance and whatnot testing, and yet many software projects lack the necessary test coverage to be able to change stuff without accidentally breaking things. “But testing takes time, we don’t have it”. We are perfectly aware that testing saves time, as we’ve all had those “not again!” recurring bugs. And yet we think of all sorts of excuses – “let the QAs test it”, we have to ship that now, we’ll test it later”, “this is too trivial to be tested”, etc.
And you may say it’s not our job. We don’t define what has do be done, we just do it. We don’t define the budget, the scope, the features. We just write whatever has been decided. And that’s plain wrong. It’s not our job to make money out of our code, and it’s not our job to define what customers need, but apart from that everything is our job. The way the software is structured, the security aspects and security features, the stability of the code base, the way the software behaves in different environments. The non-functional requirements are our job, and putting them on the backlog is our job.
You’ve probably heard that every software becomes “legacy” after 6 months. And that’s because of us, our sloppiness, our inability to mitigate external factors and constraints. Too often we create a mess through “just doing our job”.
And of course that’s a generalization. I happen to know a lot of great professionals who don’t make these mistakes, who strive for excellence and implement things the right way. But our industry as a whole doesn’t. Our industry as a whole produces bad software. And it’s our fault, as developers – as the only people who know why a certain piece of software is bad.
In a talk of his, Bob Martin warns us of the risks of our sloppiness. We have been building websites so far, but we are more and more building stuff that interacts with the real world, directly and indirectly. Ultimately, lives may depend on our software (like the recent unfortunate death caused by a self-driving car). And I’ll agree with Uncle Bob that it’s high time we self-regulate as an industry, before some technically incompetent politician decides to do that.
How, I don’t know. We’ll have to think more about it. But I’m pretty sure it’s our fault that software is bad, and no amount of blaming the management, the budget, the timing, the tools or the process can eliminate our responsibility.
Why do I insist on bashing my fellow software engineers? Because if we start looking at software development with more responsibility; with the fact that if it fails, it’s our fault, then we’re more likely to get out of our current bug-ridden, security-flawed, fragile software hole and really become the experts of the future.
Открих забавен проблем с python и multiprocessing, който в момента още не мога да реша чий проблем е (в крайна сметка ще се окаже мой). Отне ми прилично количество време да го хвана и си струва да го разкажа.
Малко предистория: ползваме influxdb, в което тъпчем бая секундни данни, които после предъвкваме до минутни. InfluxDB има continuous queries, които вършат тази работа – на някакъв интервал от време хващат новите данни и ги сгъват. Тези заявки имаха няколко проблема: – не се оправят с попълване на стари данни; – изпълняват се рядко и минутните данни изостават; – изпълняват се в общи линии в един thread, което кара минутните данни да изостават още повече (в нашия случай преди да ги сменим с около 12 часа).
Хванаха ме дяволите и си написах просто демонче на python, което да събира информация за различните бази какви данни могат да се сгънат, и паралелно да попълва данните. Работи в общи линии по следния начин: – взима списък с базите данни – пуска през multiprocessing-а да се събере за всяка база какви заявки трябва да се пуснат, на база на какви measurement-и има и докога са минутните и секундните данни в тях; – пуска през multiprocessing-а събраните от предния pass заявки – и така до края на света (или докато зависне).
След като навакса за няколко часа, успяваше да държи минутните данни в рамките на няколко минути от последните секундни данни, което си беше сериозно подобрение на ситуацията. Единственият проблем беше, че от време на време спираше да process-ва и увисваше.
Днес намерих време да го прегледам внимателно какво му се случва. Процесът изглежда като един parent и 5 fork()-нати child-а, като: Parent-а спи във futex 0x22555a0; Child 18455 във futex 0x7fdbfa366000; Child 18546 read Child 18457 във futex 0x7fdbfa366000 Child 18461 във futex 0x7fdbfa366000 Child 18462 във futex 0x7fdbfa366000 Child 18465 във futex 0x7fdbf908c2c0
Това не беше особено полезно, и се оказа, че стандартния python debugger (pdb) не може да се закача за съществуващи процеси, но за сметка на това gdb с подходящи debug символи може, и може да дава доста полезна информация. По този начин открих, че parent-а чака един child да приключи работата си:
#11 PyEval_EvalFrameEx ( [email protected]=Frame 0x235fb80, for file /usr/lib64/python2.7/multiprocessing/pool.py, line 543, in wait (self== 1525137960000000000 AND time < 1525138107000000000 GROUP BY time(1m), * fill(linear)\' in a read only context, please use a POST request instead', u'level': u'warning'}], u'statement_id': 0}]}, None], _callback=None, _chunksize=1, _number_left=1, _ready=False, _success=True, _cond=<_Condition(_Verbose__verbose=False, _Condition__lock=, acquire=, _Condition__waiters=[], release=) at remote 0x7fdbe0015310>, _job=45499, _cache={45499: < ...>}) a...(truncated), [email protected]=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
Като в pool.py около ред 543 има следното:
class ApplyResult(object):
...
def wait(self, timeout=None): self._cond.acquire() try: if not self._ready: self._cond.wait(timeout) finally: self._cond.release()
Първоначално си мислех, че 18546 очаква да прочете нещо от грешното място, но излезе, че това е child-а, който е спечелил състезанието за изпълняване на следващата задача и чака да му я дадат (което изглежда се раздава през futex 0x7fdbfa366000). Един от child-овете обаче чака в друг lock:
(gdb) bt #0 __lll_lock_wait () at ../nptl/sysdeps/unix/sysv/linux/x86_64/lowlevellock.S:135 #1 0x00007fdbf9b68dcb in _L_lock_812 () from /lib64/libpthread.so.0 #2 0x00007fdbf9b68c98 in __GI___pthread_mutex_lock ([email protected]=0x7fdbf908c2c0 ) at ../nptl/pthread_mutex_lock.c:79 #3 0x00007fdbf8e846ea in _nss_files_gethostbyname4_r ([email protected]=0x233fa44 "localhost", [email protected]=0x7fdbecfcb8e0, [email protected]=0x7fdbecfcb340 "hZ \372\333\177", [email protected]=1064, [email protected]=0x7fdbecfcb8b0, [email protected]=0x7fdbecfcb910, [email protected]=0x0) at nss_files/files-hosts.c:381 #4 0x00007fdbf9170ed8 in gaih_inet (name=, [email protected]=0x233fa44 "localhost", service=, [email protected]=0x7fdbecfcbb90, [email protected]=0x7fdbecfcb9f0, [email protected]=0x7fdbecfcb9e0) at ../sysdeps/posix/getaddrinfo.c:877 #5 0x00007fdbf91745cd in __GI_getaddrinfo ([email protected]=0x233fa44 "localhost", [email protected]=0x7fdbecfcbbc0 "8086", [email protected]=0x7fdbecfcbb90, [email protected]=0x7fdbecfcbb78) at ../sysdeps/posix/getaddrinfo.c:2431 #6 0x00007fdbeed8760d in socket_getaddrinfo (self=, args=) at /usr/src/debug/Python-2.7.5/Modules/socketmodule.c:4193 #7 0x00007fdbf9e5fbb0 in call_function (oparg=, pp_stack=0x7fdbecfcbd10) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4408 #8 PyEval_EvalFrameEx ( [email protected]=Frame 0x7fdbe8013350, for file /usr/lib/python2.7/site-packages/urllib3/util/connection.py, line 64, in create_connection (address=('localhost', 8086), timeout=3000, source_address=None, socket_options=[(6, 1, 1)], host='localhost', port=8086, err=None), [email protected]=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
(gdb) frame 3 #3 0x00007fdbf8e846ea in _nss_files_gethostbyname4_r ([email protected]=0x233fa44 "localhost", [email protected]=0x7fdbecfcb8e0, [email protected]=0x7fdbecfcb340 "hZ \372\333\177", [email protected]=1064, [email protected]=0x7fdbecfcb8b0, [email protected]=0x7fdbecfcb910, [email protected]=0x0) at nss_files/files-hosts.c:381 381 __libc_lock_lock (lock); (gdb) list 376 enum nss_status 377 _nss_files_gethostbyname4_r (const char *name, struct gaih_addrtuple **pat, 378 char *buffer, size_t buflen, int *errnop, 379 int *herrnop, int32_t *ttlp) 380 { 381 __libc_lock_lock (lock); 382 383 /* Reset file pointer to beginning or open file. */ 384 enum nss_status status = internal_setent (keep_stream); 385
Или в превод – опитваме се да вземем стандартния lock, който libc-то използва за да си пази reentrant функциите, и някой го държи. Кой ли?
… което е и съвсем очаквано, при условие, че са два процеса и тая памет не е обща.
Та, явно това, което се е случило е, че докато parent-а е правел fork(), тоя lock го е държал някой, и child-а реално не може да пипне каквото и да е, свързано с него (което значи никакви reentrant функции в glibc-то, каквито па всички ползват (и би трябвало да ползват)). Въпросът е, че по принцип това не би трябвало да е възможно, щото около fork() няма нищо, което да взима тоя lock, и би трябвало glibc да си освобождава lock-а като излиза от функциите си.
Първоначалното ми идиотско предположение беше, че в signal handler-а на SIGCHLD multiprocessing модула създава новите child-ове, и така докато нещо друго държи lock-а идва сигнал, прави се нов процес и той го “наследява” заключен. Това беше твърде глупаво, за да е истина, и се оказа, че не е…
Около въпросите с lock-а бях стигнал с търсене до две неща – issue 127 в gperftools и Debian bug 657835. Първото каза, че проблемът ми може да е от друг lock, който някой друг държи преди fork-а (което ме накара да се загледам по-внимателно какви lock-ове се държат), а второто, че като цяло ако fork-ваш thread-нато приложение, може после единствено да правиш execve(), защото всичко друго не е ясно колко ще работи.
И накрая се оказа, че ако се ползва multiprocessing модула, той пуска в главния процес няколко thread-а, които да се занимават със следенето и пускането на child-ове за обработка. Та ето какво реално се случва:
– някой child си изработва нужния брой операции и излиза – parent-а получава SIGCHLD и си отбелязва, че трябва да види какво става – главния thread на parent-а тръгва да събира списъка бази, и вика в някакъв момент _nss_files_gethostbyname4_r, който взима lock-а; – по това време другия thread казва “а, нямам достатъчно child-ове, fork()” – profit.
Текущото ми глупаво решение е да не правя нищо в главния thread, което може да взима тоя lock и да се надявам, че няма още някой такъв. Бъдещото ми решение е или да го пиша на python3 с някой друг модул по темата, или на go (което ще трябва да науча).
AWS Config enables continuous monitoring of your AWS resources, making it simple to assess, audit, and record resource configurations and changes. AWS Config does this through the use of rules that define the desired configuration state of your AWS resources. AWS Config provides a number of AWS managed rules that address a wide range of security concerns such as checking if you encrypted your Amazon Elastic Block Store (Amazon EBS) volumes, tagged your resources appropriately, and enabled multi-factor authentication (MFA) for root accounts. You can also create custom rules to codify your compliance requirements through the use of AWS Lambda functions.
In this post we’ll show you how to use AWS Config to monitor our Amazon Simple Storage Service (S3) bucket ACLs and policies for violations which allow public read or public write access. If AWS Config finds a policy violation, we’ll have it trigger an Amazon CloudWatch Event rule to trigger an AWS Lambda function which either corrects the S3 bucket ACL, or notifies you via Amazon Simple Notification Service (Amazon SNS) that the policy is in violation and allows public read or public write access. We’ll show you how to do this in five main steps.
Enable AWS Config to monitor Amazon S3 bucket ACLs and policies for compliance violations.
Create an IAM Role and Policy that grants a Lambda function permissions to read S3 bucket policies and send alerts through SNS.
Create and configure a CloudWatch Events rule that triggers the Lambda function when AWS Config detects an S3 bucket ACL or policy violation.
Create a Lambda function that uses the IAM role to review S3 bucket ACLs and policies, correct the ACLs, and notify your team of out-of-compliance policies.
Verify the monitoring solution.
Note: This post assumes your compliance policies require the buckets you monitor not allow public read or write access. If you have intentionally open buckets serving static content, for example, you can use this post as a jumping-off point for a solution tailored to your needs.
At the end of this post, we provide an AWS CloudFormation template that implements the solution outlined. The template enables you to deploy the solution in multiple regions quickly.
Important: The use of some of the resources deployed, including those deployed using the provided CloudFormation template, will incur costs as long as they are in use. AWS Config Rules incur costs in each region they are active.
Architecture
Here’s an architecture diagram of what we’ll implement:
Figure 1: Architecture diagram
Step 1: Enable AWS Config and Amazon S3 Bucket monitoring
The following steps demonstrate how to set up AWS Config to monitor Amazon S3 buckets.
If this is your first time using AWS Config, select Get started. If you’ve already used AWS Config, select Settings.
In the Settings page, under Resource types to record, clear the All resources checkbox. In the Specific types list, select Bucket under S3.
Figure 2: The Settings dialog box showing the “Specific types” list
Choose the Amazon S3 bucket for storing configuration history and snapshots. We’ll create a new Amazon S3 bucket.
Figure 3: Creating an S3 bucket
If you prefer to use an existing Amazon S3 bucket in your account, select the Choose a bucket from your account radio button and, using the dropdown, select an existing bucket.
Figure 4: Selecting an existing S3 bucket
Under Amazon SNS topic, check the box next to Stream configuration changes and notifications to an Amazon SNS topic, and then select the radio button to Create a topic.
Alternatively, you can choose a topic that you have previously created and subscribed to.
Figure 5: Selecting a topic that you’ve previously created and subscribed to
If you created a new SNS topic you need to subscribe to it to receive notifications. We’ll cover this in a later step.
Under AWS Config role, choose Create a role (unless you already have a role you want to use). We’re using the auto-suggested role name.
Figure 6: Creating a role
Select Next.
Configure Amazon S3 bucket monitoring rules:
On the AWS Config rules page, search for S3 and choose the s3-bucket-publice-read-prohibited and s3-bucket-public-write-prohibited rules, then click Next.
Figure 7: AWS Config rules dialog
On the Review page, select Confirm. AWS Config is now analyzing your Amazon S3 buckets, capturing their current configurations, and evaluating the configurations against the rules we selected.
If you created a new Amazon SNS topic, open the Amazon SNS Management Console and locate the topic you created:
Figure 8: Amazon SNS topic list
Copy the ARN of the topic (the string that begins with arn:) because you’ll need it in a later step.
Select the checkbox next to the topic, and then, under the Actions menu, select Subscribe to topic.
Select Email as the protocol, enter your email address, and then select Create subscription.
After several minutes, you’ll receive an email asking you to confirm your subscription for notifications for this topic. Select the link to confirm the subscription.
Step 2: Create a Role for Lambda
Our Lambda will need permissions that enable it to inspect and modify Amazon S3 bucket ACLs and policies, log to CloudWatch Logs, and publishing to an Amazon SNS topic. We’ll now set up a custom AWS Identity and Access Management (IAM) policy and role to support these actions and assign them to the Lambda function we’ll create in the next section.
In the AWS Management Console, under Services, select IAM to access the IAM Console.
Create a policy with the following permissions, or copy the following policy:
Select Lambda from the list of services that will use this role.
Select the check box next to the policy you created previously, and then select Next: Review
Name your role, give it a description, and then select Create Role. In this example, we’re naming the role LambdaS3PolicySecuringRole.
Step 3: Create and Configure a CloudWatch Rule
In this section, we’ll create a CloudWatch Rule to trigger the Lambda function when AWS Config determines that your Amazon S3 buckets are non-compliant.
In the AWS Management Console, under Services, select CloudWatch.
On the left-hand side, under Events, select Rules.
Click Create rule.
In Step 1: Create rule, under Event Source, select the dropdown list and select Build custom event pattern.
Copy the following pattern and paste it into the text box:
The pattern matches events generated by AWS Config when it checks the Amazon S3 bucket for public accessibility.
We’ll add a Lambda target later. For now, select your Amazon SNS topic created earlier, and then select Configure details.
Figure 9: The “Create rule” dialog
Give your rule a name and description. For this example, we’ll name ours AWSConfigFoundOpenBucket
Click Create rule.
Step 4: Create a Lambda Function
In this section, we’ll create a new Lambda function to examine an Amazon S3 bucket’s ACL and bucket policy. If the bucket ACL is found to allow public access, the Lambda function overwrites it to be private. If a bucket policy is found, the Lambda function creates an SNS message, puts the policy in the message body, and publishes it to the Amazon SNS topic we created. Bucket policies can be complex, and overwriting your policy may cause unexpected loss of access, so this Lambda function doesn’t attempt to alter your policy in any way.
Get the ARN of the Amazon SNS topic created earlier.
In the AWS Management Console, under Services, select Lambda to go to the Lambda Console.
From the Dashboard, select Create Function. Or, if you were taken directly to the Functions page, select the Create Function button in the upper-right.
On the Create function page:
Choose Author from scratch.
Provide a name for the function. We’re using AWSConfigOpenAccessResponder.
The Lambda function we’ve written is Python 3.6 compatible, so in the Runtime dropdown list, select Python 3.6.
Under Role, select Choose an existing role. Select the role you created in the previous section, and then select Create function.
Figure 10: The “Create function” dialog
We’ll now add a CloudWatch Event based on the rule we created earlier.
In the Add triggers section, select CloudWatch Events. A CloudWatch Events box should appear connected to the left side of the Lambda Function and have a note that states Configuration required.
Figure 11: CloudWatch Events in the “Add triggers” section
From the Rule dropdown box, choose the rule you created earlier, and then select Add.
Scroll up to the Designer section and select the name of your Lambda function.
Delete the default code and paste in the following code:
import boto3
from botocore.exceptions import ClientError
import json
import os
ACL_RD_WARNING = "The S3 bucket ACL allows public read access."
PLCY_RD_WARNING = "The S3 bucket policy allows public read access."
ACL_WRT_WARNING = "The S3 bucket ACL allows public write access."
PLCY_WRT_WARNING = "The S3 bucket policy allows public write access."
RD_COMBO_WARNING = ACL_RD_WARNING + PLCY_RD_WARNING
WRT_COMBO_WARNING = ACL_WRT_WARNING + PLCY_WRT_WARNING
def policyNotifier(bucketName, s3client):
try:
bucketPolicy = s3client.get_bucket_policy(Bucket = bucketName)
# notify that the bucket policy may need to be reviewed due to security concerns
sns = boto3.client('sns')
subject = "Potential compliance violation in " + bucketName + " bucket policy"
message = "Potential bucket policy compliance violation. Please review: " + json.dumps(bucketPolicy['Policy'])
# send SNS message with warning and bucket policy
response = sns.publish(
TopicArn = os.environ['TOPIC_ARN'],
Subject = subject,
Message = message
)
except ClientError as e:
# error caught due to no bucket policy
print("No bucket policy found; no alert sent.")
def lambda_handler(event, context):
# instantiate Amazon S3 client
s3 = boto3.client('s3')
resource = list(event['detail']['requestParameters']['evaluations'])[0]
bucketName = resource['complianceResourceId']
complianceFailure = event['detail']['requestParameters']['evaluations'][0]['annotation']
if(complianceFailure == ACL_RD_WARNING or complianceFailure == PLCY_RD_WARNING):
s3.put_bucket_acl(Bucket = bucketName, ACL = 'private')
elif(complianceFailure == PLCY_RD_WARNING or complianceFailure == PLCY_WRT_WARNING):
policyNotifier(bucketName, s3)
elif(complianceFailure == RD_COMBO_WARNING or complianceFailure == WRT_COMBO_WARNING):
s3.put_bucket_acl(Bucket = bucketName, ACL = 'private')
policyNotifier(bucketName, s3)
return 0 # done
Scroll down to the Environment variables section. This code uses an environment variable to store the Amazon SNS topic ARN.
For the key, enter TOPIC_ARN.
For the value, enter the ARN of the Amazon SNS topic created earlier.
Under Execution role, select Choose an existing role, and then select the role created earlier from the dropdown.
Leave everything else as-is, and then, at the top, select Save.
Step 5: Verify it Works
We now have the Lambda function, an Amazon SNS topic, AWS Config watching our Amazon S3 buckets, and a CloudWatch Rule to trigger the Lambda function if a bucket is found to be non-compliant. Let’s test them to make sure they work.
We have an Amazon S3 bucket, myconfigtestbucket that’s been created in the region monitored by AWS Config, as well as the associated Lambda function. This bucket has no public read or write access set in an ACL or a policy, so it’s compliant.
Figure 12: The “Config Dashboard”
Let’s change the bucket’s ACL to allow public listing of objects:
Figure 13: Screen shot of “Permissions” tab showing Everyone granted list access
After saving, the bucket now has public access. After several minutes, the AWS Config Dashboard notes that there is one non-compliant resource:
Figure 14: The “Config Dashboard” shown with a non-compliant resource
In the Amazon S3 Console, we can see that the bucket no longer has public listing of objects enabled after the invocation of the Lambda function triggered by the CloudWatch Rule created earlier.
Figure 15: The “Permissions” tab showing list access no longer allowed
Notice that the AWS Config Dashboard now shows that there are no longer any non-compliant resources:
Figure 16: The “Config Dashboard” showing zero non-compliant resources
Now, let’s try out the Amazon S3 bucket policy check by configuring a bucket policy that allows list access:
Figure 17: A bucket policy that allows list access
A few minutes after setting this bucket policy on the myconfigtestbucket bucket, AWS Config recognizes the bucket is no longer compliant. Because this is a bucket policy rather than an ACL, we publish a notification to the SNS topic we created earlier that lets us know about the potential policy violation:
Figure 18: Notification about potential policy violation
Knowing that the policy allows open listing of the bucket, we can now modify or delete the policy, after which AWS Config will recognize that the resource is compliant.
Conclusion
In this post, we demonstrated how you can use AWS Config to monitor for Amazon S3 buckets with open read and write access ACLs and policies. We also showed how to use Amazon CloudWatch, Amazon SNS, and Lambda to overwrite a public bucket ACL, or to alert you should a bucket have a suspicious policy. You can use the CloudFormation template to deploy this solution in multiple regions quickly. With this approach, you will be able to easily identify and secure open Amazon S3 bucket ACLs and policies. Once you have deployed this solution to multiple regions you can aggregate the results using an AWS Config aggregator. See this post to learn more.
If you have feedback about this blog post, submit comments in the Comments section below. If you have questions about this blog post, start a new thread on the AWS Config forum or contact AWS Support.
Want more AWS Security news? Follow us on Twitter.
The Internet of Things (IoT) has precipitated to an influx of connected devices and data that can be mined to gain useful business insights. If you own an IoT device, you might want the data to be uploaded seamlessly from your connected devices to the cloud so that you can make use of cloud storage and the processing power to perform sophisticated analysis of data. To upload the data to the AWS Cloud, devices must pass authentication and authorization checks performed by the respective AWS services. The standard way of authenticating AWS requests is the Signature Version 4 algorithm that requires the caller to have an access key ID and secret access key. Consequently, you need to hardcode the access key ID and the secret access key on your devices. Alternatively, you can use the built-in X.509 certificate as the unique device identity to authenticate AWS requests.
AWS IoT has introduced the credentials provider feature that allows a caller to authenticate AWS requests by having an X.509 certificate. The credentials provider authenticates a caller using an X.509 certificate, and vends a temporary, limited-privilege security token. The token can be used to sign and authenticate any AWS request. Thus, the credentials provider relieves you from having to manage and periodically refresh the access key ID and secret access key remotely on your devices.
In the process of retrieving a security token, you use AWS IoT to create a thing (a representation of a specific device or logical entity), register a certificate, and create AWS IoT policies. You also configure an AWS Identity and Access Management (IAM) role and attach appropriate IAM policies to the role so that the credentials provider can assume the role on your behalf. You also make an HTTP-over-Transport Layer Security (TLS) mutual authentication request to the credentials provider that uses your preconfigured thing, certificate, policies, and IAM role to authenticate and authorize the request, and obtain a security token on your behalf. You can then use the token to sign any AWS request using Signature Version 4.
In this blog post, I explain the AWS IoT credentials provider design and then demonstrate the end-to-end process of retrieving a security token from AWS IoT and using the token to write a temperature and humidity record to a specific Amazon DynamoDB table.
Note: This post assumes you are familiar with AWS IoT and IAM to perform steps using the AWS CLI and OpenSSL. Make sure you are running the latest version of the AWS CLI.
Overview of the credentials provider workflow
The following numbered diagram illustrates the credentials provider workflow. The diagram is followed by explanations of the steps.
To explain the steps of the workflow as illustrated in the preceding diagram:
The AWS IoT device uses the AWS SDK or custom client to make an HTTPS request to the credentials provider for a security token. The request includes the device X.509 certificate for authentication.
The credentials provider forwards the request to the AWS IoT authentication and authorization module to verify the certificate and the permission to request the security token.
If the certificate is valid and has permission to request a security token, the AWS IoT authentication and authorization module returns success. Otherwise, it returns failure, which goes back to the device with the appropriate exception.
If assuming the role succeeds, AWS STS returns a temporary, limited-privilege security token to the credentials provider.
The credentials provider returns the security token to the device.
The AWS SDK on the device uses the security token to sign an AWS request with AWS Signature Version 4.
The requested service invokes IAM to validate the signature and authorize the request against access policies attached to the preconfigured IAM role.
If IAM validates the signature successfully and authorizes the request, the request goes through.
In another solution, you could configure an AWS Lambda rule that ingests your device data and sends it to another AWS service. However, in applications that require the uploading of large files such as videos or aggregated telemetry to the AWS Cloud, you may want your devices to be able to authenticate and send data directly to the AWS service of your choice. The credentials provider enables you to do that.
Outline of the steps to retrieve and use security token
Perform the following steps as part of this solution:
Create an AWS IoT thing: Start by creating a thing that corresponds to your home thermostat in the AWS IoT thing registry database. This allows you to authenticate the request as a thing and use thing attributes as policy variables in AWS IoT and IAM policies.
Register a certificate: Create and register a certificate with AWS IoT, and attach it to the thing for successful device authentication.
Create and configure an IAM role: Create an IAM role to be assumed by the service on behalf of your device. I illustrate how to configure a trust policy and an access policy so that AWS IoT has permission to assume the role, and the token has necessary permission to make requests to DynamoDB.
Create a role alias: Create a role alias in AWS IoT. A role alias is an alternate data model pointing to an IAM role. The credentials provider request must include a role alias name to indicate which IAM role to assume for obtaining a security token from AWS STS. You may update the role alias on the server to point to a different IAM role and thus make your device obtain a security token with different permissions.
Attach a policy: Create an authorization policy with AWS IoT and attach it to the certificate to control which device can assume which role aliases.
Request a security token: Make an HTTPS request to the credentials provider and retrieve a security token and use it to sign a DynamoDB request with Signature Version 4.
Use the security token to sign a request: Use the retrieved token to sign a request to DynamoDB and successfully write a temperature and humidity record from your home thermostat in a specific table. Thus, starting with an X.509 certificate on your home thermostat, you can successfully upload your thermostat record to DynamoDB and use it for further analysis. Before the availability of the credentials provider, you could not do this.
Deploy the solution
1. Create an AWS IoT thing
Register your home thermostat in the AWS IoT thing registry database by creating a thing type and a thing. You can use the AWS CLI with the following command to create a thing type. The thing type allows you to store description and configuration information that is common to a set of things.
Now, you need to have a Certificate Authority (CA) certificate, sign a device certificate using the CA certificate, and register both certificates with AWS IoT before your device can authenticate to AWS IoT. If you do not already have a CA certificate, you can use OpenSSL to create a CA certificate, as described in Use Your Own Certificate. To register your CA certificate with AWS IoT, follow the steps on Registering Your CA Certificate.
You then have to create a device certificate signed by the CA certificate and register it with AWS IoT, which you can do by following the steps on Creating a Device Certificate Using Your CA Certificate. Save the certificate and the corresponding key pair; you will use them when you request a security token later. Also, remember the password you provide when you create the certificate.
Run the following command in the AWS CLI to attach the device certificate to your thing so that you can use thing attributes in policy variables.
If the attach-thing-principal command succeeds, the output is empty.
3. Configure an IAM role
Next, configure an IAM role in your AWS account that will be assumed by the credentials provider on behalf of your device. You are required to associate two policies with the role: a trust policy that controls who can assume the role, and an access policy that controls which actions can be performed on which resources by assuming the role.
The following trust policy grants the credentials provider permission to assume the role. Put it in a text document and save the document with the name, trustpolicyforiot.json.
The following access policy allows DynamoDB operations on the table that has the same name as the thing name that you created in Step 1, MyHomeThermostat, by using credentials-iot:ThingName as a policy variable. I explain after Step 5 about using thing attributes as policy variables. Put the following policy in a text document and save the document with the name, accesspolicyfordynamodb.json.
Finally, run the following command in the AWS CLI to attach the access policy to your role.
aws iam attach-role-policy --role-name dynamodb-access-role --policy-arn arn:aws:iam::<your_aws_account_id>:policy/accesspolicyfordynamodb
If the attach-role-policy command succeeds, the output is empty.
Configure the PassRole permissions
The IAM role that you have created must be passed to AWS IoT to create a role alias, as described in Step 4. The user who performs the operation requires iam:PassRole permission to authorize this action. You also should add permission for the iam:GetRole action to allow the user to retrieve information about the specified role. Create the following policy to grant iam:PassRole and iam:GetRole permissions. Name this policy, passrolepermission.json.
Now, run the following command to attach the policy to the user.
aws iam attach-user-policy --policy-arn arn:aws:iam::<your_aws_account_id>:policy/passrolepermission --user-name <user_name>
If the attach-user-policy command succeeds, the output is empty.
4. Create a role alias
Now that you have configured the IAM role, you will create a role alias with AWS IoT. You must provide the following pieces of information when creating a role alias:
RoleAlias: This is the primary key of the role alias data model and hence a mandatory attribute. It is a string; the minimum length is 1 character, and the maximum length is 128 characters.
RoleArn: This is the Amazon Resource Name (ARN) of the IAM role you have created. This is also a mandatory attribute.
CredentialDurationSeconds: This is an optional attribute specifying the validity (in seconds) of the security token. The minimum value is 900 seconds (15 minutes), and the maximum value is 3,600 seconds (60 minutes); the default value is 3,600 seconds, if not specified.
Run the following command in the AWS CLI to create a role alias. Use the credentials of the user to whom you have given the iam:PassRole permission.
You created and registered a certificate with AWS IoT earlier for successful authentication of your device. Now, you need to create and attach a policy to the certificate to authorize the request for the security token.
Let’s say you want to allow a thing to get credentials for the role alias, Thermostat-dynamodb-access-role-alias, with thing owner Alice, thing type thermostat, and the thing attached to a principal. The following policy, with thing attributes as policy variables, achieves these requirements. After this step, I explain more about using thing attributes as policy variables. Put the policy in a text document, and save it with the name, alicethermostatpolicy.json.
If the attach-policy command succeeds, the output is empty.
You have completed all the necessary steps to request an AWS security token from the credentials provider!
Using thing attributes as policy variables
Before I show how to request a security token, I want to explain more about how to use thing attributes as policy variables and the advantage of using them. As a prerequisite, a device must provide a thing name in the credentials provider request.
Thing substitution variables in AWS IoT policies
AWS IoT Simplified Permission Management allows you to associate a connection with a specific thing, and allow the thing name, thing type, and other thing attributes to be available as substitution variables in AWS IoT policies. You can write a generic AWS IoT policy as in alicethermostatpolicy.json in Step 5, attach it to multiple certificates, and authorize the connection as a thing. For example, you could attach alicethermostatpolicy.json to certificates corresponding to each of the thermostats you have that you want to assume the role alias, Thermostat-dynamodb-access-role-alias, and allow operations only on the table with the name that matches the thing name. For more information, see the full list of thing policy variables.
Thing substitution variables in IAM policies
You also can use the following three substitution variables in the IAM role’s access policy (I used credentials-iot:ThingName in accesspolicyfordynamodb.json in Step 3):
credentials-iot:ThingName
credentials-iot:ThingTypeName
credentials-iot:AwsCertificateId
When the device provides the thing name in the request, the credentials provider fetches these three variables from the database and adds them as context variables to the security token. When the device uses the token to access DynamoDB, the variables in the role’s access policy are replaced with the corresponding values in the security token. Note that you also can use credentials-iot:AwsCertificateId as a policy variable; AWS IoT returns certificateId during registration.
6. Request a security token
Make an HTTPS request to the credentials provider to fetch a security token. You have to supply the following information:
Certificate and key pair: Because this is an HTTP request over TLS mutual authentication, you have to provide the certificate and the corresponding key pair to your client while making the request. Use the same certificate and key pair that you used during certificate registration with AWS IoT.
RoleAlias: Provide the role alias (in this example, Thermostat-dynamodb-access-role-alias) to be assumed in the request.
ThingName: Provide the thing name that you created earlier in the AWS IoT thing registry database. This is passed as a header with the name, x-amzn-iot-thingname. Note that the thing name is mandatory only if you have thing attributes as policy variables in AWS IoT or IAM policies.
Run the following command in the AWS CLI to obtain your AWS account-specific endpoint for the credentials provider. See the DescribeEndpoint API documentation for further details.
Note that if you are on Mac OS X, you need to export your certificate to a .pfx or .p12 file before you can pass it in the https request. Use OpenSSL with the following command to convert the device certificate from .pem to .pfx format. Remember the password because you will need it subsequently in a curl command.
Now, make an HTTPS request to the credentials provider to fetch a security token. You may use your preferred HTTP client for the request. I use curl in the following examples.
This command returns a security token object that has an accessKeyId, a secretAccessKey, a sessionToken, and an expiration. The following is sample output of the curl command.
Create a DynamoDB table called MyHomeThermostat in your AWS account. You will have to choose the hash (partition key) and the range (sort key) while creating the table to uniquely identify a record. Make the hash the serial_number of the thermostat and the range the timestamp of the record. Create a text file with the following JSON to put a temperature and humidity record in the table. Name the file, item.json.
You can use the accessKeyId, secretAccessKey, and sessionToken retrieved from the output of the curl command to sign a request that writes the temperature and humidity record to the DynamoDB table. Use the following commands to accomplish this.
In this blog post, I demonstrated how to retrieve a security token by using an X.509 certificate and then writing an item to a DynamoDB table by using the security token. Similarly, you could run applications on surveillance cameras or sensor devices that exchange the X.509 certificate for an AWS security token and use the token to upload video streams to Amazon Kinesis or telemetry data to Amazon CloudWatch.
If you have comments about this blog post, submit them in the “Comments” section below. If you have questions about or issues implementing this solution, start a new thread on the AWS IoT forum.
This blog post was co-authored by Ujjwal Ratan, a senior AI/ML solutions architect on the global life sciences team.
Healthcare data is generated at an ever-increasing rate and is predicted to reach 35 zettabytes by 2020. Being able to cost-effectively and securely manage this data whether for patient care, research or legal reasons is increasingly important for healthcare providers.
Healthcare providers must have the ability to ingest, store and protect large volumes of data including clinical, genomic, device, financial, supply chain, and claims. AWS is well-suited to this data deluge with a wide variety of ingestion, storage and security services (e.g. AWS Direct Connect, Amazon Kinesis Streams, Amazon S3, Amazon Macie) for customers to handle their healthcare data. In a recent Healthcare IT News article, healthcare thought-leader, John Halamka, noted, “I predict that five years from now none of us will have datacenters. We’re going to go out to the cloud to find EHRs, clinical decision support, analytics.”
I realize simply storing this data is challenging enough. Magnifying the problem is the fact that healthcare data is increasingly attractive to cyber attackers, making security a top priority. According to Mariya Yao in her Forbes column, it is estimated that individual medical records can be worth hundreds or even thousands of dollars on the black market.
In this first of a 2-part post, I will address the value that AWS can bring to customers for ingesting, storing and protecting provider’s healthcare data. I will describe key components of any cloud-based healthcare workload and the services AWS provides to meet these requirements. In part 2 of this post we will dive deep into the AWS services used for advanced analytics, artificial intelligence and machine learning.
The data tsunami is upon us
So where is this data coming from? In addition to the ubiquitous electronic health record (EHR), the sources of this data include:
genomic sequencers
devices such as MRIs, x-rays and ultrasounds
sensors and wearables for patients
medical equipment telemetry
mobile applications
Additional sources of data come from non-clinical, operational systems such as:
human resources
finance
supply chain
claims and billing
Data from these sources can be structured (e.g., claims data) as well as unstructured (e.g., clinician notes). Some data comes across in streams such as that taken from patient monitors, while some comes in batch form. Still other data comes in near-real time such as HL7 messages. All of this data has retention policies dictating how long it must be stored. Much of this data is stored in perpetuity as many systems in use today have no purge mechanism. AWS has services to manage all these data types as well as their retention, security and access policies.
Imaging is a significant contributor to this data tsunami. Increasing demand for early-stage diagnoses along with aging populations drive increasing demand for images from CT, PET, MRI, ultrasound, digital pathology, X-ray and fluoroscopy. For example, a thin-slice CT image can be hundreds of megabytes. Increasing demand and strict retention policies make storage costly.
Due to the plummeting cost of gene sequencing, molecular diagnostics (including liquid biopsy) is a large contributor to this data deluge. Many predict that as the value of molecular testing becomes more identifiable, the reimbursement models will change and it will increasingly become the standard of care. According to the Washington Post article “Sequencing the Genome Creates so Much Data We Don’t Know What to do with It,”
“Some researchers predict that up to one billion people will have their genome sequenced by 2025 generating up to 40 exabytes of data per year.”
Although genomics is primarily used for oncology diagnostics today, it’s also used for other purposes, pharmacogenomics — used to understand how an individual will metabolize a medication.
Reference Architecture
It is increasingly challenging for the typical hospital, clinic or physician practice to securely store, process and manage this data without cloud adoption.
Amazon has a variety of ingestion techniques depending on the nature of the data including size, frequency and structure. AWS Snowball and AWS Snowmachine are appropriate for extremely-large, secure data transfers whether one time or episodic. AWS Glue is a fully-managed ETL service for securely moving data from on-premise to AWS and Amazon Kinesis can be used for ingesting streaming data.
Amazon S3, Amazon S3 IA, and Amazon Glacier are economical, data-storage services with a pay-as-you-go pricing model that expand (or shrink) with the customer’s requirements.
The above architecture has four distinct components – ingestion, storage, security, and analytics. In this post I will dive deeper into the first three components, namely ingestion, storage and security. In part 2, I will look at how to use AWS’ analytics services to draw value on, and optimize, your healthcare data.
Ingestion
A typical provider data center will consist of many systems with varied datasets. AWS provides multiple tools and services to effectively and securely connect to these data sources and ingest data in various formats. The customers can choose from a range of services and use them in accordance with the use case.
For use cases involving one-time (or periodic), very large data migrations into AWS, customers can take advantage of AWS Snowball devices. These devices come in two sizes, 50 TB and 80 TB and can be combined together to create a petabyte scale data transfer solution.
The devices are easy to connect and load and they are shipped to AWS avoiding the network bottlenecks associated with such large-scale data migrations. The devices are extremely secure supporting 256-bit encryption and come in a tamper-resistant enclosure. AWS Snowball imports data in Amazon S3 which can then interface with other AWS compute services to process that data in a scalable manner.
For use cases involving a need to store a portion of datasets on premises for active use and offload the rest on AWS, the Amazon storage gateway service can be used. The service allows you to seamlessly integrate on premises applications via standard storage protocols like iSCSI or NFS mounted on a gateway appliance. It supports a file interface, a volume interface and a tape interface which can be utilized for a range of use cases like disaster recovery, backup and archiving, cloud bursting, storage tiering and migration.
The AWS Storage Gateway appliance can use the AWS Direct Connect service to establish a dedicated network connection from the on premises data center to AWS.
Specific Industry Use Cases
By using the AWS proposed reference architecture for disaster recovery, healthcare providers can ensure their data assets are securely stored on the cloud and are easily accessible in the event of a disaster. The “AWS Disaster Recovery” whitepaper includes details on options available to customers based on their desired recovery time objective (RTO) and recovery point objective (RPO).
AWS is an ideal destination for offloading large volumes of less-frequently-accessed data. These datasets are rarely used in active compute operations but are exceedingly important to retain for reasons like compliance. By storing these datasets on AWS, customers can take advantage of the highly-durable platform to securely store their data and also retrieve them easily when they need to. For more details on how AWS enables customers to run back and archival use cases on AWS, please refer to the following set of whitepapers.
A healthcare provider may have a variety of databases spread throughout the hospital system supporting critical applications such as EHR, PACS, finance and many more. These datasets often need to be aggregated to derive information and calculate metrics to optimize business processes. AWS Glue is a fully-managed Extract, Transform and Load (ETL) service that can read data from a JDBC-enabled, on-premise database and transfer the datasets into AWS services like Amazon S3, Amazon Redshift and Amazon RDS. This allows customers to create transformation workflows that integrate smaller datasets from multiple sources and aggregates them on AWS.
Healthcare providers deal with a variety of streaming datasets which often have to be analyzed in near real time. These datasets come from a variety of sources such as sensors, messaging buses and social media, and often do not adhere to an industry standard. The Amazon Kinesis suite of services, that includes Amazon Kinesis Streams, Amazon Kinesis Firehose, and Amazon Kinesis Analytics, are the ideal set of services to accomplish the task of deriving value from streaming data.
Example: Using AWS Glue to de-identify and ingest healthcare data into S3 Let’s consider a scenario in which a provider maintains patient records in a database they want to ingest into S3. The provider also wants to de-identify the data by stripping personally- identifiable attributes and store the non-identifiable information in an S3 bucket. This bucket is different from the one that contains identifiable information. Doing this allows the healthcare provider to separate sensitive information with more restrictions set up via S3 bucket policies.
To ingest records into S3, we create a Glue job that reads from the source database using a Glue connection. The connection is also used by a Glue crawler to populate the Glue data catalog with the schema of the source database. We will use the Glue development endpoint and a zeppelin notebook server on EC2 to develop and execute the job.
Step 1: Import the necessary libraries and also set a glue context which is a wrapper on the spark context:
Step 2: Create a dataframe from the source data. I call the dataframe “readmissionsdata”. Here is what the schema would look like:
Step 3: Now select the columns that contains indentifiable information and store it in a new dataframe. Call the new dataframe “phi”.
Step 4: Non-PHI columns are stored in a separate dataframe. Call this dataframe “nonphi”.
Step 5: Write the two dataframes into two separate S3 buckets
Once successfully executed, the PHI and non-PHI attributes are stored in two separate files in two separate buckets that can be individually maintained.
Storage
In 2016, 327 healthcare providers reported a protected health information (PHI) breach, affecting 16.4m patient records[1]. There have been 342 data breaches reported in 2017 — involving 3.2 million patient records.[2]
To date, AWS has released 51 HIPAA-eligible services to help customers address security challenges and is in the process of making many more services HIPAA-eligible. These HIPAA-eligible services (along with all other AWS services) help customers build solutions that comply with HIPAA security and auditing requirements. A catalogue of HIPAA-enabled services can be found at AWS HIPAA-eligible services. It is important to note that AWS manages physical and logical access controls for the AWS boundary. However, the overall security of your workloads is a shared responsibility, where you are responsible for controlling user access to content on your AWS accounts.
AWS storage services allow you to store data efficiently while maintaining high durability and scalability. By using Amazon S3 as the central storage layer, you can take advantage of the Amazon S3 storage management features to get operational metrics on your data sets and transition them between various storage classes to save costs. By tagging objects on Amazon S3, you can build a governance layer on Amazon S3 to grant role based access to objects using Amazon IAM and Amazon S3 bucket policies.
To learn more about the Amazon S3 storage management features, see the following link.
Security
In the example above, we are storing the PHI information in a bucket named “phi.” Now, we want to protect this information to make sure its encrypted, does not have unauthorized access, and all access requests to the data are logged.
Encryption: S3 provides settings to enable default encryption on a bucket. This ensures any object in the bucket is encrypted by default.
Logging: S3 provides object level logging that can be used to capture all API calls to the object. The API calls are logged in cloudtrail for easy access and consolidation. Moreover, it also supports events to proactively alert customers of read and write operations.
Access control: Customers can use S3 bucket policies and IAM policies to restrict access to the phi bucket. It can also put a restriction to enforce multi-factor authentication on the bucket. For example, the following policy enforces multi-factor authentication on the phi bucket:
In Part 1 of this blog, we detailed the ingestion, storage, security and management of healthcare data on AWS. Stay tuned for part two where we are going to dive deep into optimizing the data for analytics and machine learning.
This post courtesy of Massimiliano Angelino, AWS Solutions Architect
Different enterprise systems—ERP, CRM, BI, HR, etc.—need to exchange information but normally cannot do that natively because they are from different vendors. Enterprises have tried multiple ways to integrate heterogeneous systems, generally referred to as enterprise application integration (EAI).
Modern EAI systems are based on a message-oriented middleware (MoM), also known as enterprise service bus (ESB). An ESB provides data communication via a message bus, on top of which it also provides components to orchestrate, route, translate, and monitor the data exchange. Communication with the ESB is done via adapters or connectors provided by the ESB. In this way, the different applications do not have to have specific knowledge of the technology used to provide the integration.
Amazon MQ used with Apache Camel is an open-source alternative to commercial ESBs. With the launch of Amazon MQ, integration between on-premises applications and cloud services becomes much simpler. Amazon MQ provides a managed message broker service currently supporting ApacheMQ 5.15.0.
In this post, I show how a simple integration between Amazon MQ and other AWS services can be achieved by using Apache Camel.
Apache Camel provides built-in connectors for integration with a wide variety of AWS services such as Amazon MQ, Amazon SQS, Amazon SNS, Amazon SWF, Amazon S3, AWS Lambda, Amazon DynamoDB, AWS Elastic Beanstalk, and Amazon Kinesis Streams. It also provides a broad range of other connectors including Cassandra, JDBC, Spark, and even Facebook and Slack.
EAI system architecture
Different applications use different data formats, hence the need for a translation/transformation service. Such services can be provided to or from a common “normalized” format, or specifically between two applications.
The use of normalized formats simplifies the integration process when multiple applications need to share the same data, as the number of conversions to be realized is N (number of applications). This is at the cost of a more complex adaptation to a common format, which is required to cover all needs from the different applications, current and future.
Another characteristic of an EAI system is the support of distributed transactions to ensure data consistency across multiple applications.
EAI system architecture is normally composed of the following components:
A centralized broker that handles security, access control, and data communications. Amazon MQ provides these features through the support of multiple transport protocols (AMQP, Openwire, MQTT, WebSocket), security (all communications are encrypted via SSL), and per destination granular access control.
An independent data model, also known as the canonical data model. XML is the de facto standard for the data representation.
Connectors/agents that allow the applications to communicate with the broker.
A system model to allow a standardized way for all components to interface with the EAI. Java Message Service (JMS) and Windows Communication Foundation (WCF) are standard APIs to interact with constructs such as queues and topics to implement the different messaging patterns.
Walkthrough
This solution walks you through the following steps:
Creating the broker
Writing a simple application
Adding the dependencies
Triaging files into S3
Writing the Camel route
Sending files to the AMQP queue
Setting up AMQP
Testing the code
Creating the broker
To create a new broker, log in to your AWS account and choose Amazon MQ. Amazon MQ is currently available in six AWS Regions:
US East (N. Virginia)
US East (Ohio)
US West (Oregon)
EU (Ireland)
EU (Frankfurt)
Asia Pacific (Sydney) regions.
Make sure that you have selected one of these Regions.
The master user name and password are used to access the monitoring console of the broker and can be also used to authenticate when connecting the clients to the broker. I recommend creating separate users, without console access, to authenticate the clients to the broker, after the broker has been created.
For this example, create a single broker without failover. If your application requires a higher availability level, check the Create standby in a different zone check box. In case the principal broker instance would fail, the standby takes over in seconds. To make the client aware of the standby, use the failover:// protocol in the connection configuration pointing to both broker endpoints.
Leave the other settings as is. The broker takes few minutes to be created. After it’s done, you can see the list of endpoints available for the different protocols.
After the broker has been created, modify the security group to add the allowed ports and sources for access.
For this example, you need access to the ActiveMQ admin page and to AMQP. Open up ports 8162 and 5671 to the public address of your laptop.
You can also create a new user for programmatic access to the broker. In the Users section, choose Create User and add a new user named sdk.
Writing a simple application
The complete code for this walkthrough is available from the aws-amazonmq-apachecamel-sample GitHub repo. Clone the repository on your local machine to have the fully functional example. The rest of this post offers step-by-step instructions to build this solution.
To write the application, use Apache Maven and the Camel archetypes provided by Maven. If you do not have Apache Maven installed on your machine, you can follow the instructions at Installing Apache Maven.
From a terminal, run the following command:
mvn archetype:generate
You get a list of archetypes. Type camel to get only the one related to camel. In this case, use the java8 example and type the following:
Maven now generates the skeleton code in a folder named as the artifactId. In this case:
camel-aws-simple
Next, test that the environment is configured correctly to run Camel. At the prompt, run the following commands:
cd camel-aws-simple
mvn install
mvn exec:java
You should see a log appearing in the console, printing the following:
[INFO] --- exec-maven-plugin:1.6.0:java (default-cli) @ camel-aws-test ---
[ com.angmas.MainApp.main()] DefaultCamelContext INFO Apache Camel 2.20.1 (CamelContext: camel-1) is starting
[ com.angmas.MainApp.main()] ManagedManagementStrategy INFO JMX is enabled
[ com.angmas.MainApp.main()] DefaultTypeConverter INFO Type converters loaded (core: 192, classpath: 0)
[ com.angmas.MainApp.main()] DefaultCamelContext INFO StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
[ com.angmas.MainApp.main()] DefaultCamelContext INFO Route: route1 started and consuming from: timer://simple?period=1000
[ com.angmas.MainApp.main()] DefaultCamelContext INFO Total 1 routes, of which 1 are started
[ com.angmas.MainApp.main()] DefaultCamelContext INFO Apache Camel 2.20.1 (CamelContext: camel-1) started in 0.419 seconds
[-1) thread #2 - timer://simple] route1 INFO Got a String body
[-1) thread #2 - timer://simple] route1 INFO Got an Integer body
[-1) thread #2 - timer://simple] route1 INFO Got a Double body
[-1) thread #2 - timer://simple] route1 INFO Got a String body
[-1) thread #2 - timer://simple] route1 INFO Got an Integer body
[-1) thread #2 - timer://simple] route1 INFO Got a Double body
[-1) thread #2 - timer://simple] route1 INFO Got a String body
[-1) thread #2 - timer://simple] route1 INFO Got an Integer body
[-1) thread #2 - timer://simple] route1 INFO Got a Double body
Adding the dependencies
Now that you have verified that the sample works, modify it to add the dependencies to interface to Amazon MQ/ActiveMQ and AWS.
For the following steps, you can use a normal text editor, such as vi, Sublime Text, or Visual Studio Code. Or, open the maven project in an IDE such as Eclipse or IntelliJ IDEA.
Open pom.xml and add the following lines inside the <dependencies> tag:
The camel-aws component is taking care of the interface with the supported AWS services without requiring any in-depth knowledge of the AWS Java SDK. For more information, see Camel Components for Amazon Web Services.
Triaging files into S3
Write a Camel component that receives files as a payload to messages in a queue and write them to an S3 bucket with different prefixes depending on the extension.
Because the broker that you created is exposed via a public IP address, you can execute the code from anywhere that there is an internet connection that allows communication on the specific ports. In this example, run the code from your own laptop. A broker can also be created without public IP address, in which case it is only accessible from inside the VPC in which it has been created, or by any peered VPC or network connected via a virtual gateway (VPN or AWS Direct Connect).
First, look at the code created by Maven. The archetype chosen created a standalone Camel context run via the helper org.apache.camel.main.Main class. This provides an easy way to run Camel routes from an IDE or the command line without needing to deploy it inside a container. Apache Camel can be also run as an OSGi module, or Spring and SpringBoot bean.
package com.angmas;
import org.apache.camel.main.Main;
/**
* A Camel Application
*/
public class MainApp {
/**
* A main() so you can easily run these routing rules in your IDE
*/
public static void main(String... args) throws Exception {
Main main = new Main();
main.addRouteBuilder(new MyRouteBuilder());
main.run(args);
}
}
The main method instantiates the Camel Main helper class and the routes, and runs the Camel application. The MyRouteBuilder class creates a route using Java DSL. It is also possible to define routes in Spring XML and load them dynamically in the code.
public void configure() {
// this sample sets a random body then performs content-based
// routing on the message using method references
from("timer:simple?period=1000")
.process()
.message(m -> m.setHeader("index", index++ % 3))
.transform()
.message(this::randomBody)
.choice()
.when()
.body(String.class::isInstance)
.log("Got a String body")
.when()
.body(Integer.class::isInstance)
.log("Got an Integer body")
.when()
.body(Double.class::isInstance)
.log("Got a Double body")
.otherwise()
.log("Other type message");
}
Writing the Camel route
Replace the existing route with one that fetches messages from Amazon MQ over AMQP, and routes the content to different S3 buckets depending on the file name extension.
Reads messages from the AMQP queue named filequeue.
Processes the message and sets a new ext header using the setExtensionHeader method (see below).
Checks the value of the ext header and write the body of the message as an object in an S3 bucket using different key prefixes, retaining the original name of the file.
The Amazon S3 component is configured with the bucket name, and a reference to an S3 client (amazonS3client=#s3Client) that you added to the Camel registry in the Main method of the app. Adding the object to the Camel registry allows Camel to find the object at runtime. Even though you could pass the region, accessKey, and secretKey parameters directly in the component URI, this way is more secure. It can make use of EC2 instance roles, so that you never need to pass the secrets.
Sending files to the AMQP queue
To send the files to the AMQP queue for testing, add another Camel route. In a real scenario, the messages to the AMQP queue are generated by another client. You are going to create a new route builder, but you could also add this route inside the existing MyRouteBuilder.
package com.angmas;
import org.apache.camel.builder.RouteBuilder;
/**
* A Camel Java8 DSL Router
*/
public class MessageProducerBuilder extends RouteBuilder {
/**
* Configure the Camel routing rules using Java code...
*/
public void configure() {
from("file://input?delete=false&noop=true")
.log("Content ${body} ${headers.CamelFileName}")
.to("amqp:filequeue");
}
}
The code reads files from the input folder in the work directory and publishes it to the queue. The route builder is added in the main class:
By default, Camel tries to connect to a local AMQP broker. Configure it to connect to your Amazon MQ broker.
Create an AMQPConnectionDetails object that is configured to connect to Amazon MQ broker with SSL and pass the user name and password that you set on the broker. Adding the object to the Camel registry allows Camel to find the object at runtime and use it as the default connection to AMQP.
public class MainApp {
public static String BROKER_URL = System.getenv("BROKER_URL");
public static String AMQP_URL = "amqps://"+BROKER_URL+":5671";
public static String BROKER_USERNAME = System.getenv("BROKER_USERNAME");
public static String BROKER_PASSWORD = System.getenv("BROKER_PASSWORD");
/**
* A main() so you can easily run these routing rules in your IDE
*/
public static void main(String... args) throws Exception {
Main main = new Main();
main.bind("amqp", getAMQPconnection());
main.bind("s3Client", AmazonS3ClientBuilder.standard().withRegion(Regions.US_EAST_1).build());
main.addRouteBuilder(new MyRouteBuilder());
main.addRouteBuilder(new MessageProducerBuilder());
main.run(args);
}
public static AMQPConnectionDetails getAMQPconnection() {
return new AMQPConnectionDetails(AMQP_URL, BROKER_USERNAME, BROKER_PASSWORD);
}
}
The AMQP_URL uses the amqps schema that indicates that you are using SSL. You then add the component to the registry. Camel finds it by matching the class type. main.bind("amqp-ssl", getAMQPConnection());
Testing the code
Create an input folder in the project root, and create few files with different extensions, such as txt, html, and csv.
Set the different environment variables required by the code, either in the shell or in your IDE as execution configuration.
If you are running the example from an EC2 instance, ensure that the EC2 instance role has read permission on the S3 bucket.
If you are running this on your laptop, ensure that you have configured the AWS credentials in the environment, for example, by using the aws configure command.
From the command line, execute the code:
mvn exec:java
If you are using an IDE, execute the main class. Camel outputs logging information and you should see messages listing the content and names of the files in the input folder.
Keep adding some more files to the input folder. You see that they are triaged in S3 a few seconds later. You can open the S3 console to check that they have been created.
To stop Camel, press CTRL+C in the shell.
Conclusion
In this post, I showed you how to create a publicly accessible Amazon MQ broker, and how to use Apache Camel to easily integrate AWS services with the broker. In the example, you created a Camel route that reads messages containing files from the AMQP queue and triages them by file extension into an S3 bucket.
Camel supports several components and provides blueprints for several enterprise integration patterns. Used in combination with the Amazon MQ, it provides a powerful and flexible solution to extend traditional enterprise solutions to the AWS Cloud, and integrate them seamlessly with cloud-native services, such as Amazon S3, Amazon SNS, Amazon SQS, Amazon CloudWatch, and AWS Lambda.
To learn more, see the Amazon MQ website. You can try Amazon MQ for free with the AWS Free Tier, which includes up to 750 hours of a single-instance mq.t2.micro broker and up to 1 GB of storage per month for one year.
This post courtesy of George Mao, AWS Senior Serverless Specialist – Solutions Architect
AWS Lambda and AWS CodeDeploy recently made it possible to automatically shift incoming traffic between two function versions based on a preconfigured rollout strategy. This new feature allows you to gradually shift traffic to the new function. If there are any issues with the new code, you can quickly rollback and control the impact to your application.
Previously, you had to manually move 100% of traffic from the old version to the new version. Now, you can have CodeDeploy automatically execute pre- or post-deployment tests and automate a gradual rollout strategy. Traffic shifting is built right into the AWS Serverless Application Model (SAM), making it easy to define and deploy your traffic shifting capabilities. SAM is an extension of AWS CloudFormation that provides a simplified way of defining serverless applications.
In this post, I show you how to use SAM, CloudFormation, and CodeDeploy to accomplish an automated rollout strategy for safe Lambda deployments.
Scenario
For this walkthrough, you write a Lambda application that returns a count of the S3 buckets that you own. You deploy it and use it in production. Later on, you receive requirements that tell you that you need to change your Lambda application to count only buckets that begin with the letter “a”.
Before you make the change, you need to be sure that your new Lambda application works as expected. If it does have issues, you want to minimize the number of impacted users and roll back easily. To accomplish this, you create a deployment process that publishes the new Lambda function, but does not send any traffic to it. You use CodeDeploy to execute a PreTraffic test to ensure that your new function works as expected. After the test succeeds, CodeDeploy automatically shifts traffic gradually to the new version of the Lambda function.
Your Lambda function is exposed as a REST service via an Amazon API Gateway deployment. This makes it easy to test and integrate.
Prerequisites
To execute the SAM and CloudFormation deployment, you must have the following IAM permissions:
cloudformation:*
lambda:*
codedeploy:*
iam:create*
You may use the AWS SAM Local CLI or the AWS CLI to package and deploy your Lambda application. If you choose to use SAM Local, be sure to install it onto your system. For more information, see AWS SAM Local Installation.
For this post, use SAM to define your resources because it comes with built-in CodeDeploy support for safe Lambda deployments. The deployment is handled and automated by CloudFormation.
SAM allows you to define your Serverless applications in a simple and concise fashion, because it automatically creates all necessary resources behind the scenes. For example, if you do not define an execution role for a Lambda function, SAM automatically creates one. SAM also creates the CodeDeploy application necessary to drive the traffic shifting, as well as the IAM service role that CodeDeploy uses to execute all actions.
Create a SAM template
To get started, write your SAM template and call it template.yaml.
Review the key parts of the SAM template that defines returnS3Buckets:
The AutoPublishAlias attribute instructs SAM to automatically publish a new version of the Lambda function for each new deployment and link it to the live alias.
The Policies attribute specifies additional policy statements that SAM adds onto the automatically generated IAM role for this function. The first statement provides the function with permission to call listBuckets.
The DeploymentPreference attribute configures the type of rollout pattern to use. In this case, you are shifting traffic in a linear fashion, moving 10% of traffic every minute to the new version. For more information about supported patterns, see Serverless Application Model: Traffic Shifting Configurations.
The Hooks attribute specifies that you want to execute the preTrafficHook Lambda function before CodeDeploy automatically begins shifting traffic. This function should perform validation testing on the newly deployed Lambda version. This function invokes the new Lambda function and checks the results. If you’re satisfied with the tests, instruct CodeDeploy to proceed with the rollout via an API call to: codedeploy.putLifecycleEventHookExecutionStatus.
The Events attribute defines an API-based event source that can trigger this function. It accepts requests on the /test path using an HTTP GET method.
'use strict';
const AWS = require('aws-sdk');
const codedeploy = new AWS.CodeDeploy({apiVersion: '2014-10-06'});
var lambda = new AWS.Lambda();
exports.handler = (event, context, callback) => {
console.log("Entering PreTraffic Hook!");
// Read the DeploymentId & LifecycleEventHookExecutionId from the event payload
var deploymentId = event.DeploymentId;
var lifecycleEventHookExecutionId = event.LifecycleEventHookExecutionId;
var functionToTest = process.env.NewVersion;
console.log("Testing new function version: " + functionToTest);
// Perform validation of the newly deployed Lambda version
var lambdaParams = {
FunctionName: functionToTest,
InvocationType: "RequestResponse"
};
var lambdaResult = "Failed";
lambda.invoke(lambdaParams, function(err, data) {
if (err){ // an error occurred
console.log(err, err.stack);
lambdaResult = "Failed";
}
else{ // successful response
var result = JSON.parse(data.Payload);
console.log("Result: " + JSON.stringify(result));
// Check the response for valid results
// The response will be a JSON payload with statusCode and body properties. ie:
// {
// "statusCode": 200,
// "body": 51
// }
if(result.body == 9){
lambdaResult = "Succeeded";
console.log ("Validation testing succeeded!");
}
else{
lambdaResult = "Failed";
console.log ("Validation testing failed!");
}
// Complete the PreTraffic Hook by sending CodeDeploy the validation status
var params = {
deploymentId: deploymentId,
lifecycleEventHookExecutionId: lifecycleEventHookExecutionId,
status: lambdaResult // status can be 'Succeeded' or 'Failed'
};
// Pass AWS CodeDeploy the prepared validation test results.
codedeploy.putLifecycleEventHookExecutionStatus(params, function(err, data) {
if (err) {
// Validation failed.
console.log('CodeDeploy Status update failed');
console.log(err, err.stack);
callback("CodeDeploy Status update failed");
} else {
// Validation succeeded.
console.log('Codedeploy status updated successfully');
callback(null, 'Codedeploy status updated successfully');
}
});
}
});
}
The hook is hardcoded to check that the number of S3 buckets returned is 9.
Review the key parts of the SAM template that defines preTrafficHook:
The Policies attribute specifies additional policy statements that SAM adds onto the automatically generated IAM role for this function. The first statement provides permissions to call the CodeDeploy PutLifecycleEventHookExecutionStatus API action. The second statement provides permissions to invoke the specific version of the returnS3Buckets function to test
This function has traffic shifting features disabled by setting the DeploymentPreference option to false.
The FunctionName attribute explicitly tells CloudFormation what to name the function. Otherwise, CloudFormation creates the function with the default naming convention: [stackName]-[FunctionName]-[uniqueID]. Name the function with the “CodeDeployHook_” prefix because the CodeDeployServiceRole role only allows InvokeFunction on functions named with that prefix.
Set the Timeout attribute to allow enough time to complete your validation tests.
Use an environment variable to inject the ARN of the newest deployed version of the returnS3Buckets function. The ARN allows the function to know the specific version to invoke and perform validation testing on.
Deploy the function
Your SAM template is all set and the code is written—you’re ready to deploy the function for the first time. Here’s how to do it via the SAM CLI. Replace “sam” with “cloudformation” to use CloudFormation instead.
First, package the function. This command returns a CloudFormation importable file, packaged.yaml.
sam package –template-file template.yaml –s3-bucket mybucket –output-template-file packaged.yaml
Now deploy everything:
sam deploy –template-file packaged.yaml –stack-name mySafeDeployStack –capabilities CAPABILITY_IAM
At this point, both Lambda functions have been deployed within the CloudFormation stack mySafeDeployStack. The returnS3Buckets has been deployed as Version 1:
SAM automatically created a few things, including the CodeDeploy application, with the deployment pattern that you specified (Linear10PercentEvery1Minute). There is currently one deployment group, with no action, because no deployments have occurred. SAM also created the IAM service role that this CodeDeploy application uses:
There is a single managed policy attached to this role, which allows CodeDeploy to invoke any Lambda function that begins with “CodeDeployHook_”.
An API has been set up called safeDeployStack. It targets your Lambda function with the /test resource using the GET method. When you test the endpoint, API Gateway executes the returnS3Buckets function and it returns the number of S3 buckets that you own. In this case, it’s 51.
Publish a new Lambda function version
Now implement the requirements change, which is to make returnS3Buckets count only buckets that begin with the letter “a”. The code now looks like the following (see returnS3BucketsNew.js in GitHub):
'use strict';
var AWS = require('aws-sdk');
var s3 = new AWS.S3();
exports.handler = (event, context, callback) => {
console.log("I am here! " + context.functionName + ":" + context.functionVersion);
s3.listBuckets(function (err, data){
if(err){
console.log(err, err.stack);
callback(null, {
statusCode: 500,
body: "Failed!"
});
}
else{
var allBuckets = data.Buckets;
console.log("Total buckets: " + allBuckets.length);
//callback(null, allBuckets.length);
// New Code begins here
var counter=0;
for(var i in allBuckets){
if(allBuckets[i].Name[0] === "a")
counter++;
}
console.log("Total buckets starting with a: " + counter);
callback(null, {
statusCode: 200,
body: counter
});
}
});
}
Repackage and redeploy with the same two commands as earlier:
sam package –template-file template.yaml –s3-bucket mybucket –output-template-file packaged.yaml
sam deploy –template-file packaged.yaml –stack-name mySafeDeployStack –capabilities CAPABILITY_IAM
CloudFormation understands that this is a stack update instead of an entirely new stack. You can see that reflected in the CloudFormation console:
During the update, CloudFormation deploys the new Lambda function as version 2 and adds it to the “live” alias. There is no traffic routing there yet. CodeDeploy now takes over to begin the safe deployment process.
The first thing CodeDeploy does is invoke the preTrafficHook function. Verify that this happened by reviewing the Lambda logs and metrics:
The function should progress successfully, invoke Version 2 of returnS3Buckets, and finally invoke the CodeDeploy API with a success code. After this occurs, CodeDeploy begins the predefined rollout strategy. Open the CodeDeploy console to review the deployment progress (Linear10PercentEvery1Minute):
Verify the traffic shift
During the deployment, verify that the traffic shift has started to occur by running the test periodically. As the deployment shifts towards the new version, a larger percentage of the responses return 9 instead of 51. These numbers match the S3 buckets.
A minute later, you see 10% more traffic shifting to the new version. The whole process takes 10 minutes to complete. After completion, open the Lambda console and verify that the “live” alias now points to version 2:
After 10 minutes, the deployment is complete and CodeDeploy signals success to CloudFormation and completes the stack update.
Check the results
If you invoke the function alias manually, you see the results of the new implementation.
aws lambda invoke –function [lambda arn to live alias] out.txt
You can also execute the prod stage of your API and verify the results by issuing an HTTP GET to the invoke URL:
Summary
This post has shown you how you can safely automate your Lambda deployments using the Lambda traffic shifting feature. You used the Serverless Application Model (SAM) to define your Lambda functions and configured CodeDeploy to manage your deployment patterns. Finally, you used CloudFormation to automate the deployment and updates to your function and PreTraffic hook.
Now that you know all about this new feature, you’re ready to begin automating Lambda deployments with confidence that things will work as designed. I look forward to hearing about what you’ve built with the AWS Serverless Platform.
AWS Glue provides enhanced support for working with datasets that are organized into Hive-style partitions. AWS Glue crawlers automatically identify partitions in your Amazon S3 data. The AWS Glue ETL (extract, transform, and load) library natively supports partitions when you work with DynamicFrames. DynamicFrames represent a distributed collection of data without requiring you to specify a schema. You can now push down predicates when creating DynamicFrames to filter out partitions and avoid costly calls to S3. We have also added support for writing DynamicFrames directly into partitioned directories without converting them to Apache Spark DataFrames.
Partitioning has emerged as an important technique for organizing datasets so that they can be queried efficiently by a variety of big data systems. Data is organized in a hierarchical directory structure based on the distinct values of one or more columns. For example, you might decide to partition your application logs in Amazon S3 by date—broken down by year, month, and day. Files corresponding to a single day’s worth of data would then be placed under a prefix such as s3://my_bucket/logs/year=2018/month=01/day=23/.
Systems like Amazon Athena, Amazon Redshift Spectrum, and now AWS Glue can use these partitions to filter data by value without making unnecessary calls to Amazon S3. This can significantly improve the performance of applications that need to read only a few partitions.
In this post, we show you how to efficiently process partitioned datasets using AWS Glue. First, we cover how to set up a crawler to automatically scan your partitioned dataset and create a table and partitions in the AWS Glue Data Catalog. Then, we introduce some features of the AWS Glue ETL library for working with partitioned data. You can now filter partitions using SQL expressions or user-defined functions to avoid listing and reading unnecessary data from Amazon S3. We’ve also added support in the ETL library for writing AWS Glue DynamicFrames directly into partitions without relying on Spark SQL DataFrames.
Let’s get started!
Crawling partitioned data
In this example, we use the same GitHub archive dataset that we introduced in a previous post about Scala support in AWS Glue. This data, which is publicly available from the GitHub archive, contains a JSON record for every API request made to the GitHub service. A sample dataset containing one month of activity from January 2017 is available at the following location:
Here you can replace <region> with the AWS Region in which you are working, for example, us-east-1. This dataset is partitioned by year, month, and day, so an actual file will be at a path like the following:
To crawl this data, you can either follow the instructions in the AWS Glue Developer Guide or use the provided AWS CloudFormation template. This template creates a stack that contains the following:
An IAM role with permissions to access AWS Glue resources
A database in the AWS Glue Data Catalog named githubarchive_month
A crawler set up to crawl the GitHub dataset
An AWS Glue development endpoint (which is used in the next section to transform the data)
To run this template, you must provide an S3 bucket and prefix where you can write output data in the next section. The role that this template creates will have permission to write to this bucket only. You also need to provide a public SSH key for connecting to the development endpoint. For more information about creating an SSH key, see our Development Endpoint tutorial. After you create the AWS CloudFormation stack, you can run the crawler from the AWS Glue console.
In addition to inferring file types and schemas, crawlers automatically identify the partition structure of your dataset and populate the AWS Glue Data Catalog. This ensures that your data is correctly grouped into logical tables and makes the partition columns available for querying in AWS Glue ETL jobs or query engines like Amazon Athena.
After you crawl the table, you can view the partitions by navigating to the table in the AWS Glue console and choosing View partitions. The partitions should look like the following:
For partitioned paths in Hive-style of the form key=val, crawlers automatically populate the column name. In this case, because the GitHub data is stored in directories of the form 2017/01/01, the crawlers use default names like partition_0, partition_1, and so on. You can easily change these names on the AWS Glue console: Navigate to the table, choose Edit schema, and rename partition_0 to year, partition_1 to month, and partition_2 to day:
Now that you’ve crawled the dataset and named your partitions appropriately, let’s see how to work with partitioned data in an AWS Glue ETL job.
Transforming and filtering the data
To get started with the AWS Glue ETL libraries, you can use an AWS Glue development endpoint and an Apache Zeppelin notebook. AWS Glue development endpoints provide an interactive environment to build and run scripts using Apache Spark and the AWS Glue ETL library. They are great for debugging and exploratory analysis, and can be used to develop and test scripts before migrating them to a recurring job.
If you ran the AWS CloudFormation template in the previous section, then you already have a development endpoint named partition-endpoint in your account. Otherwise, you can follow the instructions in this development endpoint tutorial. In either case, you need to set up an Apache Zeppelin notebook, either locally, or on an EC2 instance. You can find more information about development endpoints and notebooks in the AWS Glue Developer Guide.
The following examples are all written in the Scala programming language, but they can all be implemented in Python with minimal changes.
Reading a partitioned dataset
To get started, let’s read the dataset and see how the partitions are reflected in the schema. First, you import some classes that you will need for this example and set up a GlueContext, which is the main class that you will use to read and write data.
Execute the following in a Zeppelin paragraph, which is a unit of executable code:
%spark
import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.DynamicRecord
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext
import java.util.Calendar
import java.util.GregorianCalendar
import scala.collection.JavaConversions._
@transient val spark: SparkContext = SparkContext.getOrCreate()
val glueContext: GlueContext = new GlueContext(spark)
This is straightforward with two caveats: First, each paragraph must start with the line %spark to indicate that the paragraph is Scala. Second, the spark variable must be marked @transient to avoid serialization issues. This is only necessary when running in a Zeppelin notebook.
Next, read the GitHub data into a DynamicFrame, which is the primary data structure that is used in AWS Glue scripts to represent a distributed collection of data. A DynamicFrame is similar to a Spark DataFrame, except that it has additional enhancements for ETL transformations. DynamicFrames are discussed further in the post AWS Glue Now Supports Scala Scripts, and in the AWS Glue API documentation.
The following snippet creates a DynamicFrame by referencing the Data Catalog table that you just crawled and then prints the schema:
%spark
val githubEvents: DynamicFrame = glueContext.getCatalogSource(
database = "githubarchive_month",
tableName = "data"
).getDynamicFrame()
githubEvents.schema.asFieldList.foreach { field =>
println(s"${field.getName}: ${field.getType.getType.getName}")
}
You could also print the full schema using githubEvents.printSchema(). But in this case, the full schema is quite large, so I’ve printed only the top-level columns. This paragraph takes about 5 minutes to run on a standard size AWS Glue development endpoint. After it runs, you should see the following output:
Note that the partition columns year, month, and day were automatically added to each record.
Filtering by partition columns
One of the primary reasons for partitioning data is to make it easier to operate on a subset of the partitions, so now let’s see how to filter data by the partition columns. In particular, let’s find out what people are building in their free time by looking at GitHub activity on the weekends. One way to accomplish this is to use the filter transformation on the githubEvents DynamicFrame that you created earlier to select the appropriate events:
%spark
def filterWeekend(rec: DynamicRecord): Boolean = {
def getAsInt(field: String): Int = {
rec.getField(field) match {
case Some(strVal: String) => strVal.toInt
// The filter transformation will catch exceptions and mark the record as an error.
case _ => throw new IllegalArgumentException(s"Unable to extract field $field")
}
}
val (year, month, day) = (getAsInt("year"), getAsInt("month"), getAsInt("day"))
val cal = new GregorianCalendar(year, month - 1, day) // Calendar months start at 0.
val dayOfWeek = cal.get(Calendar.DAY_OF_WEEK)
dayOfWeek == Calendar.SATURDAY || dayOfWeek == Calendar.SUNDAY
}
val filteredEvents = githubEvents.filter(filterWeekend)
filteredEvents.count
This snippet defines the filterWeekend function that uses the Java Calendar class to identify those records where the partition columns (year, month, and day) fall on a weekend. If you run this code, you see that there were 6,303,480 GitHub events falling on the weekend in January 2017, out of a total of 29,160,561 events. This seems reasonable—about 22 percent of the events fell on the weekend, and about 29 percent of the days that month fell on the weekend (9 out of 31). So people are using GitHub slightly less on the weekends, but there is still a lot of activity!
Predicate pushdowns for partition columns
The main downside to using the filter transformation in this way is that you have to list and read all files in the entire dataset from Amazon S3 even though you need only a small fraction of them. This is manageable when dealing with a single month’s worth of data. But as you try to process more data, you will spend an increasing amount of time reading records only to immediately discard them.
To address this issue, we recently released support for pushing down predicates on partition columns that are specified in the AWS Glue Data Catalog. Instead of reading the data and filtering the DynamicFrame at executors in the cluster, you apply the filter directly on the partition metadata available from the catalog. Then you list and read only the partitions from S3 that you need to process.
To accomplish this, you can specify a Spark SQL predicate as an additional parameter to the getCatalogSource method. This predicate can be any SQL expression or user-defined function as long as it uses only the partition columns for filtering. Remember that you are applying this to the metadata stored in the catalog, so you don’t have access to other fields in the schema.
The following snippet shows how to use this functionality to read only those partitions occurring on a weekend:
%spark
val partitionPredicate =
"date_format(to_date(concat(year, '-', month, '-', day)), 'E') in ('Sat', 'Sun')"
val pushdownEvents = glueContext.getCatalogSource(
database = "githubarchive_month",
tableName = "data",
pushDownPredicate = partitionPredicate).getDynamicFrame()
Here you use the SparkSQL string concat function to construct a date string. You use the to_date function to convert it to a date object, and the date_format function with the ‘E’ pattern to convert the date to a three-character day of the week (for example, Mon, Tue, and so on). For more information about these functions, Spark SQL expressions, and user-defined functions in general, see the Spark SQL documentation and list of functions.
Note that the pushdownPredicate parameter is also available in Python. The corresponding call in Python is as follows:
You can observe the performance impact of pushing down predicates by looking at the execution time reported for each Zeppelin paragraph. The initial approach using a Scala filter function took 2.5 minutes:
Because the version using a pushdown lists and reads much less data, it takes only 24 seconds to complete, a 5X improvement!
Of course, the exact benefit that you see depends on the selectivity of your filter. The more partitions that you exclude, the more improvement you will see.
In addition to Hive-style partitioning for Amazon S3 paths, Parquet and ORC file formats further partition each file into blocks of data that represent column values. Each block also stores statistics for the records that it contains, such as min/max for column values. AWS Glue supports pushdown predicates for both Hive-style partitions and block partitions in these formats. While reading data, it prunes unnecessary S3 partitions and also skips the blocks that are determined unnecessary to be read by column statistics in Parquet and ORC formats.
Additional transformations
Now that you’ve read and filtered your dataset, you can apply any additional transformations to clean or modify the data. For example, you could augment it with sentiment analysis as described in the previous AWS Glue post.
To keep things simple, you can just pick out some columns from the dataset using the ApplyMapping transformation:
ApplyMapping is a flexible transformation for performing projection and type-casting. In this example, we use it to unnest several fields, such as actor.login, which we map to the top-level actor field. We also cast the id column to a long and the partition columns to integers.
Writing out partitioned data
The final step is to write out your transformed dataset to Amazon S3 so that you can process it with other systems like Amazon Athena. By default, when you write out a DynamicFrame, it is not partitioned—all the output files are written at the top level under the specified output path. Until recently, the only way to write a DynamicFrame into partitions was to convert it into a Spark SQL DataFrame before writing. We are excited to share that DynamicFrames now support native partitioning by a sequence of keys.
You can accomplish this by passing the additional partitionKeys option when creating a sink. For example, the following code writes out the dataset that you created earlier in Parquet format to S3 in directories partitioned by the type field.
Here, $outpath is a placeholder for the base output path in S3. The partitionKeys parameter can also be specified in Python in the connection_options dict:
When you execute this write, the type field is removed from the individual records and is encoded in the directory structure. To demonstrate this, you can list the output path using the aws s3 ls command from the AWS CLI:
PRE type=CommitCommentEvent/
PRE type=CreateEvent/
PRE type=DeleteEvent/
PRE type=ForkEvent/
PRE type=GollumEvent/
PRE type=IssueCommentEvent/
PRE type=IssuesEvent/
PRE type=MemberEvent/
PRE type=PublicEvent/
PRE type=PullRequestEvent/
PRE type=PullRequestReviewCommentEvent/
PRE type=PushEvent/
PRE type=ReleaseEvent/
PRE type=WatchEvent/
As expected, there is a partition for each distinct event type. In this example, we partitioned by a single value, but this is by no means required. For example, if you want to preserve the original partitioning by year, month, and day, you could simply set the partitionKeys option to be Seq(“year”, “month”, “day”).
Conclusion
In this post, we showed you how to work with partitioned data in AWS Glue. Partitioning is a crucial technique for getting the most out of your large datasets. Many tools in the AWS big data ecosystem, including Amazon Athena and Amazon Redshift Spectrum, take advantage of partitions to accelerate query processing. AWS Glue provides mechanisms to crawl, filter, and write partitioned data so that you can structure your data in Amazon S3 however you want, to get the best performance out of your big data applications.
Ben Sowell is a senior software development engineer at AWS Glue. He has worked for more than 5 years on ETL systems to help users unlock the potential of their data. In his free time, he enjoys reading and exploring the Bay Area.
Mohit Saxena is a senior software development engineer at AWS Glue. His passion is building scalable distributed systems for efficiently managing data on cloud. He also enjoys watching movies and reading about the latest technology.
The collective thoughts of the interwebz
By continuing to use the site, you agree to the use of cookies. more information
The cookie settings on this website are set to "allow cookies" to give you the best browsing experience possible. If you continue to use this website without changing your cookie settings or you click "Accept" below then you are consenting to this.