Tag Archives: Amazon EMR

Glenn’s Take on re:Invent 2017 – Part 3

Post Syndicated from Glenn Gore original https://aws.amazon.com/blogs/architecture/glenns-take-on-reinvent-2017-part-3/

Glenn Gore here, Chief Architect for AWS. I was in Las Vegas last week — with 43K others — for re:Invent 2017. I checked in to the Architecture blog here and here with my take on what was interesting about some of the bigger announcements from a cloud-architecture perspective.

In the excitement of so many new services being launched, we sometimes overlook feature updates that, while perhaps not as exciting as Amazon DeepLens, have significant impact on how you architect and develop solutions on AWS.

Amazon DynamoDB is used by more than 100,000 customers around the world, handling over a trillion requests every day. From the start, DynamoDB has offered high availability by natively spanning multiple Availability Zones within an AWS Region. As more customers started building and deploying truly-global applications, there was a need to replicate a DynamoDB table to multiple AWS Regions, allowing for read/write operations to occur in any region where the table was replicated. This update is important for providing a globally-consistent view of information — as users may transition from one region to another — or for providing additional levels of availability, allowing for failover between AWS Regions without loss of information.

There are some interesting concurrency-design aspects you need to be aware of and ensure you can handle correctly. For example, we support the “last writer wins” reconciliation where eventual consistency is being used and an application updates the same item in different AWS Regions at the same time. If you require strongly-consistent read/writes then you must perform all of your read/writes in the same AWS Region. The details behind this can be found in the DynamoDB documentation. Providing a globally-distributed, replicated DynamoDB table simplifies many different use cases and allows for the logic of replication, which may have been pushed up into the application layers to be simplified back down into the data layer.

The other big update for DynamoDB is that you can now back up your DynamoDB table on demand with no impact to performance. One of the features I really like is that when you trigger a backup, it is available instantly, regardless of the size of the table. Behind the scenes, we use snapshots and change logs to ensure a consistent backup. While backup is instant, restoring the table could take some time depending on its size and ranges — from minutes to hours for very large tables.

This feature is super important for those of you who work in regulated industries that often have strict requirements around data retention and backups of data, which sometimes limited the use of DynamoDB or required complex workarounds to implement some sort of backup feature in the past. This often incurred significant, additional costs due to increased read transactions on their DynamoDB tables.

Amazon Simple Storage Service (Amazon S3) was our first-released AWS service over 11 years ago, and it proved the simplicity and scalability of true API-driven architectures in the cloud. Today, Amazon S3 stores trillions of objects, with transactional requests per second reaching into the millions! Dealing with data as objects opened up an incredibly diverse array of use cases ranging from libraries of static images, game binary downloads, and application log data, to massive data lakes used for big data analytics and business intelligence. With Amazon S3, when you accessed your data in an object, you effectively had to write/read the object as a whole or use the range feature to retrieve a part of the object — if possible — in your individual use case.

Now, with Amazon S3 Select, an SQL-like query language is used that can work with delimited text and JSON files, as well as work with GZIP compressed files. We don’t support encryption during the preview of Amazon S3 Select.

Amazon S3 Select provides two major benefits:

  • Faster access
  • Lower running costs

Serverless Lambda functions, where every millisecond matters when you are being charged, will benefit greatly from Amazon S3 Select as data retrieval and processing of your Lambda function will experience significant speedups and cost reductions. For example, we have seen 2x speed improvement and 80% cost reduction with the Serverless MapReduce code.

Other AWS services such as Amazon Athena, Amazon Redshift, and Amazon EMR will support Amazon S3 Select as well as partner offerings including Cloudera and Hortonworks. If you are using Amazon Glacier for longer-term data archival, you will be able to use Amazon Glacier Select to retrieve a subset of your content from within Amazon Glacier.

As the volume of data that can be stored within Amazon S3 and Amazon Glacier continues to scale on a daily basis, we will continue to innovate and develop improved and optimized services that will allow you to work with these magnificently-large data sets while reducing your costs (retrieval and processing). I believe this will also allow you to simplify the transformation and storage of incoming data into Amazon S3 in basic, semi-structured formats as a single copy vs. some of the duplication and reformatting of data sometimes required to do upfront optimizations for downstream processing. Amazon S3 Select largely removes the need for this upfront optimization and instead allows you to store data once and process it based on your individual Amazon S3 Select query per application or transaction need.

Thanks for reading!

Glenn contemplating why CSV format is still relevant in 2017 (Italy).

Amazon EC2 Update – Streamlined Access to Spot Capacity, Smooth Price Changes, Instance Hibernation

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/amazon-ec2-update-streamlined-access-to-spot-capacity-smooth-price-changes-instance-hibernation/

EC2 Spot Instances give you access to spare compute capacity in the AWS Cloud. Our customers use fleets of Spot Instances to power their CI/CD environments & traffic generators, host web servers & microservices, render movies, and to run many types of analytics jobs, all at prices that offer significant savings in comparison to On-Demand Instances.

New Streamlined Access
Today we are introducing a new, streamlined access model for Spot Instances. You simply indicate your desire to use Spot capacity when you launch an instance via the RunInstances function, the run-instances command, or the AWS Management Console to submit a request that will be fulfilled as long as the capacity is available. With no extra effort on your part you’ll save up to 90% off of the On-Demand price for the instance type, allowing you to boost your overall application throughput by up to 10x for the same budget. The instances that you launch in this way will continue to run until you terminate them or if EC2 needs to reclaim them for On-Demand usage. At that point the instance will be given the usual 2-minute warning and then reclaimed, making this a great fit for applications that are fault-tolerant.

Unlike the old model which required an understanding of Spot markets, bidding, and calls to a standalone asynchronous API, the new model is synchronous and as easy to use as On-Demand. Your code or your script receives an Instance ID immediately and need not check back to see if the request has been processed and accepted.

We’ve made this as clean and as simple as possible, with the expectation that it will be easy to modify many current scripts and applications to request and make use of Spot capacity. If you want to exercise additional control over your Spot instance budget, you have the option to specify a maximum price when you make a request for capacity. If you use Spot capacity to power your Amazon EMR, Amazon ECS, or AWS Batch clusters, or if you launch Spot instances by way of a AWS CloudFormation template or Auto Scaling Group, you will benefit from this new model without having to make any changes.

Applications that are built around RequestSpotInstances or RequestSpotFleet will continue to work just fine with no changes. However, you now have the option to make requests that do not include the SpotPrice parameter.

Smooth Price Changes
As part of today’s launch we are also changing the way that Spot prices change, moving to a model where prices adjust more gradually, based on longer-term trends in supply and demand. As I mentioned earlier, you will continue to save an average of 70-90% off the On-Demand price, and you will continue to pay the Spot price that’s in effect for the time period your instances are running. Applications built around our Spot Fleet feature will continue to automatically diversify placement of their Spot Instances across the most cost-effective pools based on the configuration you specified when you created the fleet.

Spot in Action
To launch a Spot Instance from the command line; simply specify the Spot market:

$ aws ec2 run-instances –-market Spot --image-id ami-1a2b3c4d --count 1 --instance-type c3.large 

Instance Hibernation
If you run workloads that keep a lot of state in memory, you will love this new feature!

You can arrange for instances to save their in-memory state when they are reclaimed, allowing the instances and the applications on them to pick up where they left off when capacity is once again available, just like closing and then opening your laptop. This feature works on C3, C4, and certain sizes of R3, R4, and M4 instances running Amazon Linux, Ubuntu, or Windows Server, and is supported by the EC2 Hibernation Agent.

The in-memory state is written to the root EBS volume of the instance using space that is set-aside when the instance launches. The private IP address and any Elastic IP Addresses are also preserved across a stop/start cycle.

Jeff;

Using Amazon Redshift Spectrum, Amazon Athena, and AWS Glue with Node.js in Production

Post Syndicated from Rafi Ton original https://aws.amazon.com/blogs/big-data/using-amazon-redshift-spectrum-amazon-athena-and-aws-glue-with-node-js-in-production/

This is a guest post by Rafi Ton, founder and CEO of NUVIAD. NUVIAD is, in their own words, “a mobile marketing platform providing professional marketers, agencies and local businesses state of the art tools to promote their products and services through hyper targeting, big data analytics and advanced machine learning tools.”

At NUVIAD, we’ve been using Amazon Redshift as our main data warehouse solution for more than 3 years.

We store massive amounts of ad transaction data that our users and partners analyze to determine ad campaign strategies. When running real-time bidding (RTB) campaigns in large scale, data freshness is critical so that our users can respond rapidly to changes in campaign performance. We chose Amazon Redshift because of its simplicity, scalability, performance, and ability to load new data in near real time.

Over the past three years, our customer base grew significantly and so did our data. We saw our Amazon Redshift cluster grow from three nodes to 65 nodes. To balance cost and analytics performance, we looked for a way to store large amounts of less-frequently analyzed data at a lower cost. Yet, we still wanted to have the data immediately available for user queries and to meet their expectations for fast performance. We turned to Amazon Redshift Spectrum.

In this post, I explain the reasons why we extended Amazon Redshift with Redshift Spectrum as our modern data warehouse. I cover how our data growth and the need to balance cost and performance led us to adopt Redshift Spectrum. I also share key performance metrics in our environment, and discuss the additional AWS services that provide a scalable and fast environment, with data available for immediate querying by our growing user base.

Amazon Redshift as our foundation

The ability to provide fresh, up-to-the-minute data to our customers and partners was always a main goal with our platform. We saw other solutions provide data that was a few hours old, but this was not good enough for us. We insisted on providing the freshest data possible. For us, that meant loading Amazon Redshift in frequent micro batches and allowing our customers to query Amazon Redshift directly to get results in near real time.

The benefits were immediately evident. Our customers could see how their campaigns performed faster than with other solutions, and react sooner to the ever-changing media supply pricing and availability. They were very happy.

However, this approach required Amazon Redshift to store a lot of data for long periods, and our data grew substantially. In our peak, we maintained a cluster running 65 DC1.large nodes. The impact on our Amazon Redshift cluster was evident, and we saw our CPU utilization grow to 90%.

Why we extended Amazon Redshift to Redshift Spectrum

Redshift Spectrum gives us the ability to run SQL queries using the powerful Amazon Redshift query engine against data stored in Amazon S3, without needing to load the data. With Redshift Spectrum, we store data where we want, at the cost that we want. We have the data available for analytics when our users need it with the performance they expect.

Seamless scalability, high performance, and unlimited concurrency

Scaling Redshift Spectrum is a simple process. First, it allows us to leverage Amazon S3 as the storage engine and get practically unlimited data capacity.

Second, if we need more compute power, we can leverage Redshift Spectrum’s distributed compute engine over thousands of nodes to provide superior performance – perfect for complex queries running against massive amounts of data.

Third, all Redshift Spectrum clusters access the same data catalog so that we don’t have to worry about data migration at all, making scaling effortless and seamless.

Lastly, since Redshift Spectrum distributes queries across potentially thousands of nodes, they are not affected by other queries, providing much more stable performance and unlimited concurrency.

Keeping it SQL

Redshift Spectrum uses the same query engine as Amazon Redshift. This means that we did not need to change our BI tools or query syntax, whether we used complex queries across a single table or joins across multiple tables.

An interesting capability introduced recently is the ability to create a view that spans both Amazon Redshift and Redshift Spectrum external tables. With this feature, you can query frequently accessed data in your Amazon Redshift cluster and less-frequently accessed data in Amazon S3, using a single view.

Leveraging Parquet for higher performance

Parquet is a columnar data format that provides superior performance and allows Redshift Spectrum (or Amazon Athena) to scan significantly less data. With less I/O, queries run faster and we pay less per query. You can read all about Parquet at https://parquet.apache.org/ or https://en.wikipedia.org/wiki/Apache_Parquet.

Lower cost

From a cost perspective, we pay standard rates for our data in Amazon S3, and only small amounts per query to analyze data with Redshift Spectrum. Using the Parquet format, we can significantly reduce the amount of data scanned. Our costs are now lower, and our users get fast results even for large complex queries.

What we learned about Amazon Redshift vs. Redshift Spectrum performance

When we first started looking at Redshift Spectrum, we wanted to put it to the test. We wanted to know how it would compare to Amazon Redshift, so we looked at two key questions:

  1. What is the performance difference between Amazon Redshift and Redshift Spectrum on simple and complex queries?
  2. Does the data format impact performance?

During the migration phase, we had our dataset stored in Amazon Redshift and S3 as CSV/GZIP and as Parquet file formats. We tested three configurations:

  • Amazon Redshift cluster with 28 DC1.large nodes
  • Redshift Spectrum using CSV/GZIP
  • Redshift Spectrum using Parquet

We performed benchmarks for simple and complex queries on one month’s worth of data. We tested how much time it took to perform the query, and how consistent the results were when running the same query multiple times. The data we used for the tests was already partitioned by date and hour. Properly partitioning the data improves performance significantly and reduces query times.

Simple query

First, we tested a simple query aggregating billing data across a month:

SELECT 
  user_id, 
  count(*) AS impressions, 
  SUM(billing)::decimal /1000000 AS billing 
FROM <table_name> 
WHERE 
  date >= '2017-08-01' AND 
  date <= '2017-08-31'  
GROUP BY 
  user_id;

We ran the same query seven times and measured the response times (red marking the longest time and green the shortest time):

Execution Time (seconds)
  Amazon Redshift Redshift Spectrum
CSV
Redshift Spectrum Parquet
Run #1 39.65 45.11 11.92
Run #2 15.26 43.13 12.05
Run #3 15.27 46.47 13.38
Run #4 21.22 51.02 12.74
Run #5 17.27 43.35 11.76
Run #6 16.67 44.23 13.67
Run #7 25.37 40.39 12.75
Average 21.53  44.82 12.61

For simple queries, Amazon Redshift performed better than Redshift Spectrum, as we thought, because the data is local to Amazon Redshift.

What was surprising was that using Parquet data format in Redshift Spectrum significantly beat ‘traditional’ Amazon Redshift performance. For our queries, using Parquet data format with Redshift Spectrum delivered an average 40% performance gain over traditional Amazon Redshift. Furthermore, Redshift Spectrum showed high consistency in execution time with a smaller difference between the slowest run and the fastest run.

Comparing the amount of data scanned when using CSV/GZIP and Parquet, the difference was also significant:

Data Scanned (GB)
CSV (Gzip) 135.49
Parquet 2.83

Because we pay only for the data scanned by Redshift Spectrum, the cost saving of using Parquet is evident and substantial.

Complex query

Next, we compared the same three configurations with a complex query.

Execution Time (seconds)
  Amazon Redshift Redshift Spectrum CSV Redshift Spectrum Parquet
Run #1 329.80 84.20 42.40
Run #2 167.60 65.30 35.10
Run #3 165.20 62.20 23.90
Run #4 273.90 74.90 55.90
Run #5 167.70 69.00 58.40
Average 220.84 71.12 43.14

This time, Redshift Spectrum using Parquet cut the average query time by 80% compared to traditional Amazon Redshift!

Bottom line: For complex queries, Redshift Spectrum provided a 67% performance gain over Amazon Redshift. Using the Parquet data format, Redshift Spectrum delivered an 80% performance improvement over Amazon Redshift. For us, this was substantial.

Optimizing the data structure for different workloads

Because the cost of S3 is relatively inexpensive and we pay only for the data scanned by each query, we believe that it makes sense to keep our data in different formats for different workloads and different analytics engines. It is important to note that we can have any number of tables pointing to the same data on S3. It all depends on how we partition the data and update the table partitions.

Data permutations

For example, we have a process that runs every minute and generates statistics for the last minute of data collected. With Amazon Redshift, this would be done by running the query on the table with something as follows:

SELECT 
  user, 
  COUNT(*) 
FROM 
  events_table 
WHERE 
  ts BETWEEN ‘2017-08-01 14:00:00’ AND ‘2017-08-01 14:00:59’ 
GROUP BY 
  user;

(Assuming ‘ts’ is your column storing the time stamp for each event.)

With Redshift Spectrum, we pay for the data scanned in each query. If the data is partitioned by the minute instead of the hour, a query looking at one minute would be 1/60th the cost. If we use a temporary table that points only to the data of the last minute, we save that unnecessary cost.

Creating Parquet data efficiently

On the average, we have 800 instances that process our traffic. Each instance sends events that are eventually loaded into Amazon Redshift. When we started three years ago, we would offload data from each server to S3 and then perform a periodic copy command from S3 to Amazon Redshift.

Recently, Amazon Kinesis Firehose added the capability to offload data directly to Amazon Redshift. While this is now a viable option, we kept the same collection process that worked flawlessly and efficiently for three years.

This changed, however, when we incorporated Redshift Spectrum. With Redshift Spectrum, we needed to find a way to:

  • Collect the event data from the instances.
  • Save the data in Parquet format.
  • Partition the data effectively.

To accomplish this, we save the data as CSV and then transform it to Parquet. The most effective method to generate the Parquet files is to:

  1. Send the data in one-minute intervals from the instances to Kinesis Firehose with an S3 temporary bucket as the destination.
  2. Aggregate hourly data and convert it to Parquet using AWS Lambda and AWS Glue.
  3. Add the Parquet data to S3 by updating the table partitions.

With this new process, we had to give more attention to validating the data before we sent it to Kinesis Firehose, because a single corrupted record in a partition fails queries on that partition.

Data validation

To store our click data in a table, we considered the following SQL create table command:

create external TABLE spectrum.blog_clicks (
    user_id varchar(50),
    campaign_id varchar(50),
    os varchar(50),
    ua varchar(255),
    ts bigint,
    billing float
)
partitioned by (date date, hour smallint)  
stored as parquet
location 's3://nuviad-temp/blog/clicks/';

The above statement defines a new external table (all Redshift Spectrum tables are external tables) with a few attributes. We stored ‘ts’ as a Unix time stamp and not as Timestamp, and billing data is stored as float and not decimal (more on that later). We also said that the data is partitioned by date and hour, and then stored as Parquet on S3.

First, we need to get the table definitions. This can be achieved by running the following query:

SELECT 
  * 
FROM 
  svv_external_columns 
WHERE 
  tablename = 'blog_clicks';

This query lists all the columns in the table with their respective definitions:

schemaname tablename columnname external_type columnnum part_key
spectrum blog_clicks user_id varchar(50) 1 0
spectrum blog_clicks campaign_id varchar(50) 2 0
spectrum blog_clicks os varchar(50) 3 0
spectrum blog_clicks ua varchar(255) 4 0
spectrum blog_clicks ts bigint 5 0
spectrum blog_clicks billing double 6 0
spectrum blog_clicks date date 7 1
spectrum blog_clicks hour smallint 8 2

Now we can use this data to create a validation schema for our data:

const rtb_request_schema = {
    "name": "clicks",
    "items": {
        "user_id": {
            "type": "string",
            "max_length": 100
        },
        "campaign_id": {
            "type": "string",
            "max_length": 50
        },
        "os": {
            "type": "string",
            "max_length": 50            
        },
        "ua": {
            "type": "string",
            "max_length": 255            
        },
        "ts": {
            "type": "integer",
            "min_value": 0,
            "max_value": 9999999999999
        },
        "billing": {
            "type": "float",
            "min_value": 0,
            "max_value": 9999999999999
        }
    }
};

Next, we create a function that uses this schema to validate data:

function valueIsValid(value, item_schema) {
    if (schema.type == 'string') {
        return (typeof value == 'string' && value.length <= schema.max_length);
    }
    else if (schema.type == 'integer') {
        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);
    }
    else if (schema.type == 'float' || schema.type == 'double') {
        return (typeof value == 'number' && value >= schema.min_value && value <= schema.max_value);
    }
    else if (schema.type == 'boolean') {
        return typeof value == 'boolean';
    }
    else if (schema.type == 'timestamp') {
        return (new Date(value)).getTime() > 0;
    }
    else {
        return true;
    }
}

Near real-time data loading with Kinesis Firehose

On Kinesis Firehose, we created a new delivery stream to handle the events as follows:

Delivery stream name: events
Source: Direct PUT
S3 bucket: nuviad-events
S3 prefix: rtb/
IAM role: firehose_delivery_role_1
Data transformation: Disabled
Source record backup: Disabled
S3 buffer size (MB): 100
S3 buffer interval (sec): 60
S3 Compression: GZIP
S3 Encryption: No Encryption
Status: ACTIVE
Error logging: Enabled

This delivery stream aggregates event data every minute, or up to 100 MB, and writes the data to an S3 bucket as a CSV/GZIP compressed file. Next, after we have the data validated, we can safely send it to our Kinesis Firehose API:

if (validated) {
    let itemString = item.join('|')+'\n'; //Sending csv delimited by pipe and adding new line

    let params = {
        DeliveryStreamName: 'events',
        Record: {
            Data: itemString
        }
    };

    firehose.putRecord(params, function(err, data) {
        if (err) {
            console.error(err, err.stack);        
        }
        else {
            // Continue to your next step 
        }
    });
}

Now, we have a single CSV file representing one minute of event data stored in S3. The files are named automatically by Kinesis Firehose by adding a UTC time prefix in the format YYYY/MM/DD/HH before writing objects to S3. Because we use the date and hour as partitions, we need to change the file naming and location to fit our Redshift Spectrum schema.

Automating data distribution using AWS Lambda

We created a simple Lambda function triggered by an S3 put event that copies the file to a different location (or locations), while renaming it to fit our data structure and processing flow. As mentioned before, the files generated by Kinesis Firehose are structured in a pre-defined hierarchy, such as:

S3://your-bucket/your-prefix/2017/08/01/20/events-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz

All we need to do is parse the object name and restructure it as we see fit. In our case, we did the following (the event is an object received in the Lambda function with all the data about the object written to S3):

/*
	object key structure in the event object:
your-prefix/2017/08/01/20/event-4-2017-08-01-20-06-06-536f5c40-6893-4ee4-907d-81e4d3b09455.gz
	*/

let key_parts = event.Records[0].s3.object.key.split('/'); 

let event_type = key_parts[0];
let date = key_parts[1] + '-' + key_parts[2] + '-' + key_parts[3];
let hour = key_parts[4];
if (hour.indexOf('0') == 0) {
 		hour = parseInt(hour, 10) + '';
}
    
let parts1 = key_parts[5].split('-');
let minute = parts1[7];
if (minute.indexOf('0') == 0) {
        minute = parseInt(minute, 10) + '';
}

Now, we can redistribute the file to the two destinations we need—one for the minute processing task and the other for hourly aggregation:

    copyObjectToHourlyFolder(event, date, hour, minute)
        .then(copyObjectToMinuteFolder.bind(null, event, date, hour, minute))
        .then(addPartitionToSpectrum.bind(null, event, date, hour, minute))
        .then(deleteOldMinuteObjects.bind(null, event))
        .then(deleteStreamObject.bind(null, event))        
        .then(result => {
            callback(null, { message: 'done' });            
        })
        .catch(err => {
            console.error(err);
            callback(null, { message: err });            
        }); 

Kinesis Firehose stores the data in a temporary folder. We copy the object to another folder that holds the data for the last processed minute. This folder is connected to a small Redshift Spectrum table where the data is being processed without needing to scan a much larger dataset. We also copy the data to a folder that holds the data for the entire hour, to be later aggregated and converted to Parquet.

Because we partition the data by date and hour, we created a new partition on the Redshift Spectrum table if the processed minute is the first minute in the hour (that is, minute 0). We ran the following:

ALTER TABLE 
  spectrum.events 
ADD partition
  (date='2017-08-01', hour=0) 
  LOCATION 's3://nuviad-temp/events/2017-08-01/0/';

After the data is processed and added to the table, we delete the processed data from the temporary Kinesis Firehose storage and from the minute storage folder.

Migrating CSV to Parquet using AWS Glue and Amazon EMR

The simplest way we found to run an hourly job converting our CSV data to Parquet is using Lambda and AWS Glue (and thanks to the awesome AWS Big Data team for their help with this).

Creating AWS Glue jobs

What this simple AWS Glue script does:

  • Gets parameters for the job, date, and hour to be processed
  • Creates a Spark EMR context allowing us to run Spark code
  • Reads CSV data into a DataFrame
  • Writes the data as Parquet to the destination S3 bucket
  • Adds or modifies the Redshift Spectrum / Amazon Athena table partition for the table
import sys
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME','day_partition_key', 'hour_partition_key', 'day_partition_value', 'hour_partition_value' ])

#day_partition_key = "partition_0"
#hour_partition_key = "partition_1"
#day_partition_value = "2017-08-01"
#hour_partition_value = "0"

day_partition_key = args['day_partition_key']
hour_partition_key = args['hour_partition_key']
day_partition_value = args['day_partition_value']
hour_partition_value = args['hour_partition_value']

print("Running for " + day_partition_value + "/" + hour_partition_value)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = spark.read.option("delimiter","|").csv("s3://nuviad-temp/events/"+day_partition_value+"/"+hour_partition_value)
df.registerTempTable("data")

df1 = spark.sql("select _c0 as user_id, _c1 as campaign_id, _c2 as os, _c3 as ua, cast(_c4 as bigint) as ts, cast(_c5 as double) as billing from data")

df1.repartition(1).write.mode("overwrite").parquet("s3://nuviad-temp/parquet/"+day_partition_value+"/hour="+hour_partition_value)

client = boto3.client('athena', region_name='us-east-1')

response = client.start_query_execution(
    QueryString='alter table parquet_events add if not exists partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ')  location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
    QueryExecutionContext={
        'Database': 'spectrumdb'
    },
    ResultConfiguration={
        'OutputLocation': 's3://nuviad-temp/convertresults'
    }
)

response = client.start_query_execution(
    QueryString='alter table parquet_events partition(' + day_partition_key + '=\'' + day_partition_value + '\',' + hour_partition_key + '=' + hour_partition_value + ') set location \'s3://nuviad-temp/parquet/' + day_partition_value + '/hour=' + hour_partition_value + '\'' ,
    QueryExecutionContext={
        'Database': 'spectrumdb'
    },
    ResultConfiguration={
        'OutputLocation': 's3://nuviad-temp/convertresults'
    }
)

job.commit()

Note: Because Redshift Spectrum and Athena both use the AWS Glue Data Catalog, we could use the Athena client to add the partition to the table.

Here are a few words about float, decimal, and double. Using decimal proved to be more challenging than we expected, as it seems that Redshift Spectrum and Spark use them differently. Whenever we used decimal in Redshift Spectrum and in Spark, we kept getting errors, such as:

S3 Query Exception (Fetch). Task failed due to an internal error. File 'https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parquet has an incompatible Parquet schema for column 's3://nuviad-events/events.lat'. Column type: DECIMAL(18, 8), Parquet schema:\noptional float lat [i:4 d:1 r:0]\n (https://s3-external-1.amazonaws.com/nuviad-temp/events/2017-08-01/hour=2/part-00017-48ae5b6b-906e-4875-8cde-bc36c0c6d0ca.c000.snappy.parq

We had to experiment with a few floating-point formats until we found that the only combination that worked was to define the column as double in the Spark code and float in Spectrum. This is the reason you see billing defined as float in Spectrum and double in the Spark code.

Creating a Lambda function to trigger conversion

Next, we created a simple Lambda function to trigger the AWS Glue script hourly using a simple Python code:

import boto3
import json
from datetime import datetime, timedelta
 
client = boto3.client('glue')
 
def lambda_handler(event, context):
    last_hour_date_time = datetime.now() - timedelta(hours = 1)
    day_partition_value = last_hour_date_time.strftime("%Y-%m-%d") 
    hour_partition_value = last_hour_date_time.strftime("%-H") 
    response = client.start_job_run(
    JobName='convertEventsParquetHourly',
    Arguments={
         '--day_partition_key': 'date',
         '--hour_partition_key': 'hour',
         '--day_partition_value': day_partition_value,
         '--hour_partition_value': hour_partition_value
         }
    )

Using Amazon CloudWatch Events, we trigger this function hourly. This function triggers an AWS Glue job named ‘convertEventsParquetHourly’ and runs it for the previous hour, passing job names and values of the partitions to process to AWS Glue.

Redshift Spectrum and Node.js

Our development stack is based on Node.js, which is well-suited for high-speed, light servers that need to process a huge number of transactions. However, a few limitations of the Node.js environment required us to create workarounds and use other tools to complete the process.

Node.js and Parquet

The lack of Parquet modules for Node.js required us to implement an AWS Glue/Amazon EMR process to effectively migrate data from CSV to Parquet. We would rather save directly to Parquet, but we couldn’t find an effective way to do it.

One interesting project in the works is the development of a Parquet NPM by Marc Vertes called node-parquet (https://www.npmjs.com/package/node-parquet). It is not in a production state yet, but we think it would be well worth following the progress of this package.

Timestamp data type

According to the Parquet documentation, Timestamp data are stored in Parquet as 64-bit integers. However, JavaScript does not support 64-bit integers, because the native number type is a 64-bit double, giving only 53 bits of integer range.

The result is that you cannot store Timestamp correctly in Parquet using Node.js. The solution is to store Timestamp as string and cast the type to Timestamp in the query. Using this method, we did not witness any performance degradation whatsoever.

Lessons learned

You can benefit from our trial-and-error experience.

Lesson #1: Data validation is critical

As mentioned earlier, a single corrupt entry in a partition can fail queries running against this partition, especially when using Parquet, which is harder to edit than a simple CSV file. Make sure that you validate your data before scanning it with Redshift Spectrum.

Lesson #2: Structure and partition data effectively

One of the biggest benefits of using Redshift Spectrum (or Athena for that matter) is that you don’t need to keep nodes up and running all the time. You pay only for the queries you perform and only for the data scanned per query.

Keeping different permutations of your data for different queries makes a lot of sense in this case. For example, you can partition your data by date and hour to run time-based queries, and also have another set partitioned by user_id and date to run user-based queries. This results in faster and more efficient performance of your data warehouse.

Storing data in the right format

Use Parquet whenever you can. The benefits of Parquet are substantial. Faster performance, less data to scan, and much more efficient columnar format. However, it is not supported out-of-the-box by Kinesis Firehose, so you need to implement your own ETL. AWS Glue is a great option.

Creating small tables for frequent tasks

When we started using Redshift Spectrum, we saw our Amazon Redshift costs jump by hundreds of dollars per day. Then we realized that we were unnecessarily scanning a full day’s worth of data every minute. Take advantage of the ability to define multiple tables on the same S3 bucket or folder, and create temporary and small tables for frequent queries.

Lesson #3: Combine Athena and Redshift Spectrum for optimal performance

Moving to Redshift Spectrum also allowed us to take advantage of Athena as both use the AWS Glue Data Catalog. Run fast and simple queries using Athena while taking advantage of the advanced Amazon Redshift query engine for complex queries using Redshift Spectrum.

Redshift Spectrum excels when running complex queries. It can push many compute-intensive tasks, such as predicate filtering and aggregation, down to the Redshift Spectrum layer, so that queries use much less of your cluster’s processing capacity.

Lesson #4: Sort your Parquet data within the partition

We achieved another performance improvement by sorting data within the partition using sortWithinPartitions(sort_field). For example:

df.repartition(1).sortWithinPartitions("campaign_id")…

Conclusion

We were extremely pleased with using Amazon Redshift as our core data warehouse for over three years. But as our client base and volume of data grew substantially, we extended Amazon Redshift to take advantage of scalability, performance, and cost with Redshift Spectrum.

Redshift Spectrum lets us scale to virtually unlimited storage, scale compute transparently, and deliver super-fast results for our users. With Redshift Spectrum, we store data where we want at the cost we want, and have the data available for analytics when our users need it with the performance they expect.


About the Author

With 7 years of experience in the AdTech industry and 15 years in leading technology companies, Rafi Ton is the founder and CEO of NUVIAD. He enjoys exploring new technologies and putting them to use in cutting edge products and services, in the real world generating real money. Being an experienced entrepreneur, Rafi believes in practical-programming and fast adaptation of new technologies to achieve a significant market advantage.

 

 

AWS Achieves FedRAMP JAB Moderate Provisional Authorization for 20 Services in the AWS US East/West Region

Post Syndicated from Chris Gile original https://aws.amazon.com/blogs/security/aws-achieves-fedramp-jab-moderate-authorization-for-20-services-in-us-eastwest/

The AWS US East/West Region has received a Provisional Authority to Operate (P-ATO) from the Joint Authorization Board (JAB) at the Federal Risk and Authorization Management Program (FedRAMP) Moderate baseline.

Though AWS has maintained an AWS US East/West Region Agency-ATO since early 2013, this announcement represents AWS’s carefully deliberated move to the JAB for the centralized maintenance of our P-ATO for 10 services already authorized. This also includes the addition of 10 new services to our FedRAMP program (see the complete list of services below). This doubles the number of FedRAMP Moderate services available to our customers to enable increased use of the cloud and support modernized IT missions. Our public sector customers now can leverage this FedRAMP P-ATO as a baseline for their own authorizations and look to the JAB for centralized Continuous Monitoring reporting and updates. In a significant enhancement for our partners that build their solutions on the AWS US East/West Region, they can now achieve FedRAMP JAB P-ATOs of their own for their Platform as a Service (PaaS) and Software as a Service (SaaS) offerings.

In line with FedRAMP security requirements, our independent FedRAMP assessment was completed in partnership with a FedRAMP accredited Third Party Assessment Organization (3PAO) on our technical, management, and operational security controls to validate that they meet or exceed FedRAMP’s Moderate baseline requirements. Effective immediately, you can begin leveraging this P-ATO for the following 20 services in the AWS US East/West Region:

  • Amazon Aurora (MySQL)*
  • Amazon CloudWatch Logs*
  • Amazon DynamoDB
  • Amazon Elastic Block Store
  • Amazon Elastic Compute Cloud
  • Amazon EMR*
  • Amazon Glacier*
  • Amazon Kinesis Streams*
  • Amazon RDS (MySQL, Oracle, Postgres*)
  • Amazon Redshift
  • Amazon Simple Notification Service*
  • Amazon Simple Queue Service*
  • Amazon Simple Storage Service
  • Amazon Simple Workflow Service*
  • Amazon Virtual Private Cloud
  • AWS CloudFormation*
  • AWS CloudTrail*
  • AWS Identity and Access Management
  • AWS Key Management Service
  • Elastic Load Balancing

* Services with first-time FedRAMP Moderate authorizations

We continue to work with the FedRAMP Project Management Office (PMO), other regulatory and compliance bodies, and our customers and partners to ensure that we are raising the bar on our customers’ security and compliance needs.

To learn more about how AWS helps customers meet their security and compliance requirements, see the AWS Compliance website. To learn about what other public sector customers are doing on AWS, see our Government, Education, and Nonprofits Case Studies and Customer Success Stories. To review the public posting of our FedRAMP authorizations, see the FedRAMP Marketplace.

– Chris Gile, Senior Manager, AWS Public Sector Risk and Compliance

Tableau 10.4 Supports Amazon Redshift Spectrum with External Amazon S3 Tables

Post Syndicated from Robin Cottiss original https://aws.amazon.com/blogs/big-data/tableau-10-4-supports-amazon-redshift-spectrum-with-external-amazon-s3-tables/

This is a guest post by Robin Cottiss, strategic customer consultant, Russell Christopher, staff product manager, and Vaidy Krishnan, senior manager of product marketing, at Tableau. Tableau, in their own words, “helps anyone quickly analyze, visualize, and share information. More than 61,000 customer accounts get rapid results with Tableau in the office and on the go. Over 300,000 people use Tableau Public to share public data in their blogs and websites.”

We’re excited to announce today an update to our Amazon Redshift connector with support for Amazon Redshift Spectrum to analyze data in external Amazon S3 tables. This feature, the direct result of joint engineering and testing work performed by the teams at Tableau and AWS, was released as part of Tableau 10.3.3 and will be available broadly in Tableau 10.4.1. With this update, you can quickly and directly connect Tableau to data in Amazon Redshift and analyze it in conjunction with data in Amazon S3—all with drag-and-drop ease.

This connector is yet another in a series of market-leading integrations of Tableau with AWS’s analytics platform, with services such as Amazon Redshift, Amazon EMR, and Amazon Athena. These integrations have allowed Tableau to become the natural choice of tool for analyzing data stored on AWS. Beyond this, Tableau Server runs seamlessly in the AWS Cloud infrastructure. If you prefer to deploy all your applications inside AWS, you have a complete solution offering from Tableau.

How does support for Amazon Redshift Spectrum help you?

If you’re like many Tableau customers, you have large buckets of data stored in Amazon S3. You might need to access this data frequently and store it in a consistent, highly structured format. If so, you can provision it to a data warehouse like Amazon Redshift. You might also want to explore this S3 data on an ad hoc basis. For example, you might want to determine whether or not to provision the data, and where—options might be Hadoop, Impala, Amazon EMR, or Amazon Redshift. To do so, you can use Amazon Athena, a serverless interactive query service from AWS that requires no infrastructure setup and management.

But what if you want to analyze both the frequently accessed data stored locally in Amazon Redshift AND your full datasets stored cost-effectively in Amazon S3? What if you want the throughput of disk and sophisticated query optimization of Amazon Redshift AND a service that combines a serverless scale-out processing capability with the massively reliable and scalable S3 infrastructure? What if you want the super-fast performance of Amazon Redshift AND support for open storage formats (for example, Parquet or ORC) in S3?

To enable these AND and resolve the tyranny of ORs, AWS launched Amazon Redshift Spectrum earlier this year.

Amazon Redshift Spectrum gives you the freedom to store your data where you want, in the format you want, and have it available for processing when you need it. Since the Amazon Redshift Spectrum launch, Tableau has worked tirelessly to provide best-in-class support for this new service. With Tableau and Redshift Spectrum, you can extend your Amazon Redshift analyses out to the entire universe of data in your S3 data lakes.

This latest update has been tested by many customers with very positive feedback. One such customer is the world’s largest food product distributor, Sysco—you can watch their session referencing the Amazon Spectrum integration at Tableau Conference 2017. Sysco also plans to reprise its “Tableau on AWS” story again in a month’s time at AWS re:Invent.

Now, I’d like to use a concrete example to demonstrate how Tableau works with Amazon Redshift Spectrum. In this example, I also show you how and why you might want to connect to your AWS data in different ways.

The setup

I use the pipeline described following to ingest, process, and analyze data with Tableau on an AWS stack. The source data is the New York City Taxi dataset, which has 9 years’ worth of taxi rides activity (including pick-up and drop-off location, amount paid, payment type, and so on) captured in 1.2 billion records.

In this pipeline, this data lands in S3, is cleansed and partitioned by using Amazon EMR, and is then converted to a columnar Parquet format that is analytically optimized. You can point Tableau to the raw data in S3 by using Amazon Athena. You can also access the cleansed data with Tableau using Presto through your Amazon EMR cluster.

Why use Tableau this early in the pipeline? Because sometimes you want to understand what’s there and what questions are worth asking before you even start the analysis.

After you find out what those questions are and determine if this sort of analysis has long-term usefulness, you can automate and optimize that pipeline. You do this to add new data as soon as possible as it arrives, to get it to the processes and people that need it. You might also want to provision this data to a highly performant “hotter” layer (Amazon Redshift or Tableau Extract) for repeated access.

In the illustration preceding, S3 contains the raw denormalized ride data at the timestamp level of granularity. This S3 data is the fact table. Amazon Redshift has the time dimensions broken out by date, month, and year, and also has the taxi zone information.

Now imagine I want to know where and when taxi pickups happen on a certain date in a certain borough. With support for Amazon Redshift Spectrum, I can now join the S3 tables with the Amazon Redshift dimensions, as shown following.

I can next analyze the data in Tableau to produce a borough-by-borough view of New York City ride density on Christmas Day 2015.

Or I can hone in on just Manhattan and identify pickup hotspots, with ride charges way above the average!

With Amazon Redshift Spectrum, you now have a fast, cost-effective engine that minimizes data processed with dynamic partition pruning. You can further improve query performance by reducing the data scanned. You do this by partitioning and compressing data and by using a columnar format for storage.

At the end of the day, which engine you use behind Tableau is a function of what you want to optimize for. Some possible engines are Amazon Athena, Amazon Redshift, and Redshift Spectrum, or you can bring a subset of data into Tableau Extract. Factors in planning optimization include these:

  • Are you comfortable with the serverless cost model of Amazon Athena and potential full scans? Or do you prefer the advantages of no setup?
  • Do you want the throughput of local disk?
  • Effort and time of setup. Are you okay with the lead-time of an Amazon Redshift cluster setup, as opposed to just bringing everything into Tableau Extract?

To meet the many needs of our customers, Tableau’s approach is simple: It’s all about choice. The choice of how you want to connect to and analyze your data. Throughout the history of our product and into the future, we have and will continue to empower choice for customers.

For more on how to deal with choice, as you go about making architecture decisions for your enterprise, watch this big data strategy session my friend Robin Cottiss and I delivered at Tableau Conference 2017. This session includes several customer examples leveraging the Tableau on AWS platform, and also a run-through of the aforementioned demonstration.

If you’re curious to learn more about analyzing data with Tableau on Amazon Redshift we encourage you to check out the following resources:

Hot Startups on AWS – October 2017

Post Syndicated from Tina Barr original https://aws.amazon.com/blogs/aws/hot-startups-on-aws-october-2017/

In 2015, the Centers for Medicare and Medicaid Services (CMS) reported that healthcare spending made up 17.8% of the U.S. GDP – that’s almost $3.2 trillion or $9,990 per person. By 2025, the CMS estimates this number will increase to nearly 20%. As cloud technology evolves in the healthcare and life science industries, we are seeing how companies of all sizes are using AWS to provide powerful and innovative solutions to customers across the globe. This month we are excited to feature the following startups:

  • ClearCare – helping home care agencies operate efficiently and grow their business.
  • DNAnexus – providing a cloud-based global network for sharing and managing genomic data.

ClearCare (San Francisco, CA)

ClearCare envisions a future where home care is the only choice for aging in place. Home care agencies play a critical role in the economy and their communities by significantly lowering the overall cost of care, reducing the number of hospital admissions, and bending the cost curve of aging. Patients receiving home care typically have multiple chronic conditions and functional limitations, driving over $190 billion in healthcare spending in the U.S. each year. To offset these costs, health insurance payers are developing in-home care management programs for patients. ClearCare’s goal is to help home care agencies leverage technology to improve costs, outcomes, and quality of life for the aging population. The company’s powerful software platform is specifically designed for use by non-medical, in-home care agencies to manage their businesses.

Founder and CEO Geoff Nudd created ClearCare because of his own grandmother’s need for care. Keeping family members and caregivers up to date on a loved one’s well being can be difficult, so Geoff created what is now ClearCare’s Family Room, which enables caregivers and agency staff to check schedules and receive real-time updates about what’s happening in the home. Since then, agencies have provided feedback on others areas of their businesses that could be streamlined. ClearCare has now built over 20 modules to help home care agencies optimize operations with services including a telephony service, billing and payroll, and more. ClearCare now serves over 4,000 home care agencies, representing 500,000 caregivers and 400,000 seniors.

Using AWS, ClearCare is able to spin up reliable infrastructure for proofs of concept and iterate on those systems to quickly get value to market. The company runs many AWS services including Amazon Elasticsearch Service, Amazon RDS, and Amazon CloudFront. Amazon EMR and Amazon Athena have enabled ClearCare to build a Hadoop-based ETL and data warehousing system that processes terabytes of data each day. By utilizing these managed services, ClearCare has been able to go from concept to customer delivery in less than three months.

To learn more about ClearCare, check out their website.

DNAnexus (Mountain View, CA)

DNAnexus is accelerating the application of genomic data in precision medicine by providing a cloud-based platform for sharing and managing genomic and biomedical data and analysis tools. The company was founded in 2009 by Stanford graduate student Andreas Sundquist and two Stanford professors Arend Sidow and Serafim Batzoglou, to address the need for scaling secondary analysis of next-generation sequencing (NGS) data in the cloud. The founders quickly learned that users needed a flexible solution to build complex analysis workflows and tools that enable them to share and manage large volumes of data. DNAnexus is optimized to address the challenges of security, scalability, and collaboration for organizations that are pursuing genomic-based approaches to health, both in clinics and research labs. DNAnexus has a global customer base – spanning North America, Europe, Asia-Pacific, South America, and Africa – that runs a million jobs each month and is doubling their storage year-over-year. The company currently stores more than 10 petabytes of biomedical and genomic data. That is equivalent to approximately 100,000 genomes, or in simpler terms, over 50 billion Facebook photos!

DNAnexus is working with its customers to help expand their translational informatics research, which includes expanding into clinical trial genomic services. This will help companies developing different medicines to better stratify clinical trial populations and develop companion tests that enable the right patient to get the right medicine. In collaboration with Janssen Human Microbiome Institute, DNAnexus is also launching Mosaic – a community platform for microbiome research.

AWS provides DNAnexus and its customers the flexibility to grow and scale research programs. Building the technology infrastructure required to manage these projects in-house is expensive and time-consuming. DNAnexus removes that barrier for labs of any size by using AWS scalable cloud resources. The company deploys its customers’ genomic pipelines on Amazon EC2, using Amazon S3 for high-performance, high-durability storage, and Amazon Glacier for low-cost data archiving. DNAnexus is also an AWS Life Sciences Competency Partner.

Learn more about DNAnexus here.

-Tina

New – Per-Second Billing for EC2 Instances and EBS Volumes

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/new-per-second-billing-for-ec2-instances-and-ebs-volumes/

Back in the old days, you needed to buy or lease a server if you needed access to compute power. When we launched EC2 back in 2006, the ability to use an instance for an hour, and to pay only for that hour, was big news. The pay-as-you-go model inspired our customers to think about new ways to develop, test, and run applications of all types.

Today, services like AWS Lambda prove that we can do a lot of useful work in a short time. Many of our customers are dreaming up applications for EC2 that can make good use of a large number of instances for shorter amounts of time, sometimes just a few minutes.

Per-Second Billing for EC2 and EBS
Effective October 2nd, usage of Linux instances that are launched in On-Demand, Reserved, and Spot form will be billed in one-second increments. Similarly, provisioned storage for EBS volumes will be billed in one-second increments.

Per-second billing also applies to Amazon EMR and AWS Batch:

Amazon EMR – Our customers add capacity to their EMR clusters in order to get their results more quickly. With per-second billing for the EC2 instances in the clusters, adding nodes is more cost-effective than ever.

AWS Batch – Many of the batch jobs that our customers run complete in less than an hour. AWS Batch already launches and terminates Spot Instances; with per-second billing batch processing will become even more economical.

Some of our more sophisticated customers have built systems to get the most value from EC2 by strategically choosing the most advantageous target instances when managing their gaming, ad tech, or 3D rendering fleets. Per-second billing obviates the need for this extra layer of instance management, and brings the costs savings to all customers and all workloads.

While this will result in a price reduction for many workloads (and you know we love price reductions), I don’t think that’s the most important aspect of this change. I believe that this change will inspire you to innovate and to think about your compute-bound problems in new ways. How can you use it to improve your support for continuous integration? Can it change the way that you provision transient environments for your dev and test workloads? What about your analytics, batch processing, and 3D rendering?

One of the many advantages of cloud computing is the elastic nature of provisioning or deprovisioning resources as you need them. By billing usage down to the second we will enable customers to level up their elasticity, save money, and customers will be positioned to take advantage of continuing advances in computing.

Things to Know
This change is effective in all AWS Regions and will be effective October 2, for all Linux instances that are newly launched or already running. Per-second billing is not currently applicable to instances running Microsoft Windows or Linux distributions that have a separate hourly charge. There is a 1 minute minimum charge per-instance.

List prices and Spot Market prices are still listed on a per-hour basis, but bills are calculated down to the second, as is Reserved Instance usage (you can launch, use, and terminate multiple instances within an hour and get the Reserved Instance Benefit for all of the instances). Also, bills will show times in decimal form, like this:

The Dedicated Per Region Fee, EBS Snapshots, and products in AWS Marketplace are still billed on an hourly basis.

Jeff;

 

Implement Continuous Integration and Delivery of Apache Spark Applications using AWS

Post Syndicated from Luis Caro Perez original https://aws.amazon.com/blogs/big-data/implement-continuous-integration-and-delivery-of-apache-spark-applications-using-aws/

When you develop Apache Spark–based applications, you might face some additional challenges when dealing with continuous integration and deployment pipelines, such as the following common issues:

  • Applications must be tested on real clusters using automation tools (live test)
  • Any user or developer must be able to easily deploy and use different versions of both the application and infrastructure to be able to debug, experiment on, and test different functionality.
  • Infrastructure needs to be evaluated and tested along with the application that uses it.

In this post, we walk you through a solution that implements a continuous integration and deployment pipeline supported by AWS services. The pipeline offers the following workflow:

  • Deploy the application to a QA stage after a commit is performed to the source code.
  • Perform a unit test using Spark local mode.
  • Deploy to a dynamically provisioned Amazon EMR cluster and test the Spark application on it
  • Update the application as an AWS Service Catalog product version, allowing a user to deploy any version (commit) of the application on demand.

Solution overview

The following diagram shows the pipeline workflow.

The solution uses AWS CodePipeline, which allows users to orchestrate and automate the build, test, and deploy stages for application source code. The solution consists of a pipeline that contains the following stages:

  • Source: Both the Spark application source code in addition to the AWS CloudFormation template file for deploying the application are committed to version control. In this example, we use AWS CodeCommit. For an example of the application source code, see zip. 
  • Build: In this stage, you use Apache Maven both to generate the application .jar binaries and to execute all of the application unit tests that end with *Spec.scala. In this example, we use AWS CodeBuild, which runs the unit tests given that they are designed to use Spark local mode.
  • QADeploy: In this stage, the .jar file built previously is deployed using the CloudFormation template included with the application source code. All the resources are created in this stage, such as networks, EMR clusters, and so on. 
  • LiveTest: In this stage, you use Apache Maven to execute all the application tests that end with *SpecLive.scala. The tests submit EMR steps to the cluster created as part of the QADeploy step. The tests verify that the steps ran successfully and their results. 
  • LiveTestApproval: This stage is included in case a pipeline administrator approval is required to deploy the application to the next stages. The pipeline pauses in this stage until an administrator manually approves the release. 
  • QACleanup: In this stage, you use an AWS Lambda function to delete the CloudFormation template deployed as part of the QADeploy stage. The function does not affect any resources other than those deployed as part of the QADeploy stage. 
  • DeployProduct: In this stage, you use a Lambda function that creates or updates an AWS Service Catalog product and portfolio. Every time the pipeline releases a change to the application, the AWS Service Catalog product gets a new version, with the commit of the change as the version description. 

Try it out!

Use the provided sample template to get started using this solution. This template creates the pipeline described earlier with all of its stages. It performs an initial commit of the sample Spark application in order to trigger the first release change. To deploy the template, use the following AWS CLI command:

aws cloudformation create-stack  --template-url https://s3.amazonaws.com/aws-bigdata-blog/artifacts/sparkAppDemoForPipeline/emrSparkpipeline.yaml --stack-name emr-spark-pipeline --capabilities CAPABILITY_NAMED_IAM

After the template finishes creating resources, you see the pipeline name on the stack Outputs tab. After that, open the AWS CodePipeline console and select the newly created pipeline.

After a couple of minutes, AWS CodePipeline detects the initial commit applied by the CloudFormation stack and starts the first release.

You can watch how the pipeline goes through the Build, QADeploy, and LiveTest stages until it finally reaches the LiveTestApproval stage.

At this point, you can check the results of the test in the log files of the Build and LiveTest stage jobs on AWS CodeBuild. If you check the CloudFormation console, you see that a new template has been deployed as part of the QADeploy stage.

You can also visit the EMR console and view how the LiveTest stage submitted steps to the EMR cluster.

After performing the review, manually approve the revision on the LiveTestApproval stage by using the AWS CodePipeline console.

After the revision is approved, the pipeline proceeds to use a Lambda function that destroys the resources deployed on the QAdeploy stage. Finally, it creates or updates a product and portfolio in AWS Service Catalog. After the final stage of the pipeline is complete, you can check that the product is created successfully on the AWS Service Catalog console.

You can check the product versions and notice that the first version is the initial commit performed by the CloudFormation template.

You can proceed to share the created portfolio with any users in your AWS account and allow them to deploy any version of the Spark application. You can also perform a commit on the AWS CodeCommit repository. The pipeline is triggered automatically and repeats the pipeline execution to deploy a new version of the product.

To destroy all of the resources created by the stack, make sure all the deployed stacks using AWS Service Catalog or the QAdeploy stage are destroyed. Then, destroy the pipeline template using the following AWS CLI command:

 

aws cloudformation delete-stack --stack-name emr-spark-pipeline

Conclusion

You can use the sample template and Spark application shared in this post and adapt them for the specific needs of your own application. The pipeline can have as many stages as needed and it can be used to automatically deploy to AWS Service Catalog or a production environment using CloudFormation.

If you have questions or suggestions, please comment below.


Additional Reading

Learn how to implement authorization and auditing on Amazon EMR using Apache Ranger.

 


About the Authors

Luis Caro is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.

 

 

Samuel Schmidt is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.

 

 

 

From Data Lake to Data Warehouse: Enhancing Customer 360 with Amazon Redshift Spectrum

Post Syndicated from Dylan Tong original https://aws.amazon.com/blogs/big-data/from-data-lake-to-data-warehouse-enhancing-customer-360-with-amazon-redshift-spectrum/

Achieving a 360o-view of your customer has become increasingly challenging as companies embrace omni-channel strategies, engaging customers across websites, mobile, call centers, social media, physical sites, and beyond. The promise of a web where online and physical worlds blend makes understanding your customers more challenging, but also more important. Businesses that are successful in this medium have a significant competitive advantage.

The big data challenge requires the management of data at high velocity and volume. Many customers have identified Amazon S3 as a great data lake solution that removes the complexities of managing a highly durable, fault tolerant data lake infrastructure at scale and economically.

AWS data services substantially lessen the heavy lifting of adopting technologies, allowing you to spend more time on what matters most—gaining a better understanding of customers to elevate your business. In this post, I show how a recent Amazon Redshift innovation, Redshift Spectrum, can enhance a customer 360 initiative.

Customer 360 solution

A successful customer 360 view benefits from using a variety of technologies to deliver different forms of insights. These could range from real-time analysis of streaming data from wearable devices and mobile interactions to historical analysis that requires interactive, on demand queries on billions of transactions. In some cases, insights can only be inferred through AI via deep learning. Finally, the value of your customer data and insights can’t be fully realized until it is operationalized at scale—readily accessible by fleets of applications. Companies are leveraging AWS for the breadth of services that cover these domains, to drive their data strategy.

A number of AWS customers stream data from various sources into a S3 data lake through Amazon Kinesis. They use Kinesis and technologies in the Hadoop ecosystem like Spark running on Amazon EMR to enrich this data. High-value data is loaded into an Amazon Redshift data warehouse, which allows users to analyze and interact with data through a choice of client tools. Redshift Spectrum expands on this analytics platform by enabling Amazon Redshift to blend and analyze data beyond the data warehouse and across a data lake.

The following diagram illustrates the workflow for such a solution.

This solution delivers value by:

  • Reducing complexity and time to value to deeper insights. For instance, an existing data model in Amazon Redshift may provide insights across dimensions such as customer, geography, time, and product on metrics from sales and financial systems. Down the road, you may gain access to streaming data sources like customer-care call logs and website activity that you want to blend in with the sales data on the same dimensions to understand how web and call center experiences maybe correlated with sales performance. Redshift Spectrum can join these dimensions in Amazon Redshift with data in S3 to allow you to quickly gain new insights, and avoid the slow and more expensive alternative of fully integrating these sources with your data warehouse.
  • Providing an additional avenue for optimizing costs and performance. In cases like call logs and clickstream data where volumes could be many TBs to PBs, storing the data exclusively in S3 yields significant cost savings. Interactive analysis on massive datasets may now be economically viable in cases where data was previously analyzed periodically through static reports generated by inexpensive batch processes. In some cases, you can improve the user experience while simultaneously lowering costs. Spectrum is powered by a large-scale infrastructure external to your Amazon Redshift cluster, and excels at scanning and aggregating large volumes of data. For instance, your analysts maybe performing data discovery on customer interactions across millions of consumers over years of data across various channels. On this large dataset, certain queries could be slow if you didn’t have a large Amazon Redshift cluster. Alternatively, you could use Redshift Spectrum to achieve a better user experience with a smaller cluster.

Proof of concept walkthrough

To make evaluation easier for you, I’ve conducted a Redshift Spectrum proof-of-concept (PoC) for the customer 360 use case. For those who want to replicate the PoC, the instructions, AWS CloudFormation templates, and public data sets are available in the GitHub repository.

The remainder of this post is a journey through the project, observing best practices in action, and learning how you can achieve business value. The walkthrough involves:

  • An analysis of performance data from the PoC environment involving queries that demonstrate blending and analysis of data across Amazon Redshift and S3. Observe that great results are achievable at scale.
  • Guidance by example on query tuning, design, and data preparation to illustrate the optimization process. This includes tuning a query that combines clickstream data in S3 with customer and time dimensions in Amazon Redshift, and aggregates ~1.9 B out of 3.7 B+ records in under 10 seconds with a small cluster!
  • Guidance and measurements to help assess deciding between two options: accessing and analyzing data exclusively in Amazon Redshift, or using Redshift Spectrum to access data left in S3.

Stream ingestion and enrichment

The focus of this post isn’t stream ingestion and enrichment on Kinesis and EMR, but be mindful of performance best practices on S3 to ensure good streaming and query performance:

  • Use random object keys: The data files provided for this project are prefixed with SHA-256 hashes to prevent hot partitions. This is important to ensure that optimal request rates to support PUT requests from the incoming stream in addition to certain queries from large Amazon Redshift clusters that could send a large number of parallel GET requests.
  • Micro-batch your data stream: S3 isn’t optimized for small random write workloads. Your datasets should be micro-batched into large files. For instance, the “parquet-1” dataset provided batches >7 million records per file. The optimal file size for Redshift Spectrum is usually in the 100 MB to 1 GB range.

If you have an edge case that may pose scalability challenges, AWS would love to hear about it. For further guidance, talk to your solutions architect.

Environment

The project consists of the following environment:

  • Amazon Redshift cluster: 4 X dc1.large
  • Data:
    • Time and customer dimension tables are stored on all Amazon Redshift nodes (ALL distribution style):
      • The data originates from the DWDATE and CUSTOMER tables in the Star Schema Benchmark
      • The customer table contains attributes for 3 million customers.
      • The time data is at the day-level granularity, and spans 7 years, from the start of 1992 to the end of 1998.
    • The clickstream data is stored in an S3 bucket, and serves as a fact table.
      • Various copies of this dataset in CSV and Parquet format have been provided, for reasons to be discussed later.
      • The data is a modified version of the uservisits dataset from AMPLab’s Big Data Benchmark, which was generated by Intel’s Hadoop benchmark tools.
      • Changes were minimal, so that existing test harnesses for this test can be adapted:
        • Increased the 751,754,869-row dataset 5X to 3,758,774,345 rows.
        • Added surrogate keys to support joins with customer and time dimensions. These keys were distributed evenly across the entire dataset to represents user visits from six customers over seven years.
        • Values for the visitDate column were replaced to align with the 7-year timeframe, and the added time surrogate key.

Queries across the data lake and data warehouse 

Imagine a scenario where a business analyst plans to analyze clickstream metrics like ad revenue over time and by customer, market segment and more. The example below is a query that achieves this effect: 

The query part highlighted in red retrieves clickstream data in S3, and joins the data with the time and customer dimension tables in Amazon Redshift through the part highlighted in blue. The query returns the total ad revenue for three customers over the last three months, along with info on their respective market segment.

Unfortunately, this query takes around three minutes to run, and doesn’t enable the interactive experience that you want. However, there’s a number of performance optimizations that you can implement to achieve the desired performance.

Performance analysis

Two key utilities provide visibility into Redshift Spectrum:

  • EXPLAIN
    Provides the query execution plan, which includes info around what processing is pushed down to Redshift Spectrum. Steps in the plan that include the prefix S3 are executed on Redshift Spectrum. For instance, the plan for the previous query has the step “S3 Seq Scan clickstream.uservisits_csv10”, indicating that Redshift Spectrum performs a scan on S3 as part of the query execution.
  • SVL_S3QUERY_SUMMARY
    Statistics for Redshift Spectrum queries are stored in this table. While the execution plan presents cost estimates, this table stores actual statistics for past query runs.

You can get the statistics of your last query by inspecting the SVL_S3QUERY_SUMMARY table with the condition (query = pg_last_query_id()). Inspecting the previous query reveals that the entire dataset of nearly 3.8 billion rows was scanned to retrieve less than 66.3 million rows. Improving scan selectivity in your query could yield substantial performance improvements.

Partitioning

Partitioning is a key means to improving scan efficiency. In your environment, the data and tables have already been organized, and configured to support partitions. For more information, see the PoC project setup instructions. The clickstream table was defined as:

CREATE EXTERNAL TABLE clickstream.uservisits_csv10
…
PARTITIONED BY(customer int4, visitYearMonth int4)

The entire 3.8 billion-row dataset is organized as a collection of large files where each file contains data exclusive to a particular customer and month in a year. This allows you to partition your data into logical subsets by customer and year/month. With partitions, the query engine can target a subset of files:

  • Only for specific customers
  • Only data for specific months
  • A combination of specific customers and year/months

You can use partitions in your queries. Instead of joining your customer data on the surrogate customer key (that is, c.c_custkey = uv.custKey), the partition key “customer” should be used instead:

SELECT c.c_name, c.c_mktsegment, t.prettyMonthYear, SUM(uv.adRevenue)
…
ON c.c_custkey = uv.customer
…
ORDER BY c.c_name, c.c_mktsegment, uv.yearMonthKey  ASC

This query should run approximately twice as fast as the previous query. If you look at the statistics for this query in SVL_S3QUERY_SUMMARY, you see that only half the dataset was scanned. This is expected because your query is on three out of six customers on an evenly distributed dataset. However, the scan is still inefficient, and you can benefit from using your year/month partition key as well:

SELECT c.c_name, c.c_mktsegment, t.prettyMonthYear, SUM(uv.adRevenue)
…
ON c.c_custkey = uv.customer
…
ON uv.visitYearMonth = t.d_yearmonthnum
…
ORDER BY c.c_name, c.c_mktsegment, uv.visitYearMonth ASC

All joins between the tables are now using partitions. Upon reviewing the statistics for this query, you should observe that Redshift Spectrum scans and returns the exact number of rows, 66,270,117. If you run this query a few times, you should see execution time in the range of 8 seconds, which is a 22.5X improvement on your original query!

Predicate pushdown and storage optimizations 

Previously, I mentioned that Redshift Spectrum performs processing through large-scale infrastructure external to your Amazon Redshift cluster. It is optimized for performing large scans and aggregations on S3. In fact, Redshift Spectrum may even out-perform a medium size Amazon Redshift cluster on these types of workloads with the proper optimizations. There are two important variables to consider for optimizing large scans and aggregations:

  • File size and count. As a general rule, use files 100 MB-1 GB in size, as Redshift Spectrum and S3 are optimized for reading this object size. However, the number of files operating on a query is directly correlated with the parallelism achievable by a query. There is an inverse relationship between file size and count: the bigger the files, the fewer files there are for the same dataset. Consequently, there is a trade-off between optimizing for object read performance, and the amount of parallelism achievable on a particular query. Large files are best for large scans as the query likely operates on sufficiently large number of files. For queries that are more selective and for which fewer files are operating, you may find that smaller files allow for more parallelism.
  • Data format. Redshift Spectrum supports various data formats. Columnar formats like Parquet can sometimes lead to substantial performance benefits by providing compression and more efficient I/O for certain workloads. Generally, format types like Parquet should be used for query workloads involving large scans, and high attribute selectivity. Again, there are trade-offs as formats like Parquet require more compute power to process than plaintext. For queries on smaller subsets of data, the I/O efficiency benefit of Parquet is diminished. At some point, Parquet may perform the same or slower than plaintext. Latency, compression rates, and the trade-off between user experience and cost should drive your decision.

To help illustrate how Redshift Spectrum performs on these large aggregation workloads, run a basic query that aggregates the entire ~3.7 billion record dataset on Redshift Spectrum, and compared that with running the query exclusively on Amazon Redshift:

SELECT uv.custKey, COUNT(uv.custKey)
FROM <your clickstream table> as uv
GROUP BY uv.custKey
ORDER BY uv.custKey ASC

For the Amazon Redshift test case, the clickstream data is loaded, and distributed evenly across all nodes (even distribution style) with optimal column compression encodings prescribed by the Amazon Redshift’s ANALYZE command.

The Redshift Spectrum test case uses a Parquet data format with each file containing all the data for a particular customer in a month. This results in files mostly in the range of 220-280 MB, and in effect, is the largest file size for this partitioning scheme. If you run tests with the other datasets provided, you see that this data format and size is optimal and out-performs others by ~60X. 

Performance differences will vary depending on the scenario. The important takeaway is to understand the testing strategy and the workload characteristics where Redshift Spectrum is likely to yield performance benefits. 

The following chart compares the query execution time for the two scenarios. The results indicate that you would have to pay for 12 X DC1.Large nodes to get performance comparable to using a small Amazon Redshift cluster that leverages Redshift Spectrum. 

Chart showing simple aggregation on ~3.7 billion records

So you’ve validated that Spectrum excels at performing large aggregations. Could you benefit by pushing more work down to Redshift Spectrum in your original query? It turns out that you can, by making the following modification:

The clickstream data is stored at a day-level granularity for each customer while your query rolls up the data to the month level per customer. In the earlier query that uses the day/month partition key, you optimized the query so that it only scans and retrieves the data required, but the day level data is still sent back to your Amazon Redshift cluster for joining and aggregation. The query shown here pushes aggregation work down to Redshift Spectrum as indicated by the query plan:

In this query, Redshift Spectrum aggregates the clickstream data to the month level before it is returned to the Amazon Redshift cluster and joined with the dimension tables. This query should complete in about 4 seconds, which is roughly twice as fast as only using the partition key. The speed increase is evident upon reviewing the SVL_S3QUERY_SUMMARY table:

  • Bytes scanned is 21.6X less because of the Parquet data format.
  • Only 90 records are returned back to the Amazon Redshift cluster as a result of the push-down, instead of ~66.2 million, leading to substantially less join overhead, and about 530 MB less data sent back to your cluster.
  • No adverse change in average parallelism.

Assessing the value of Amazon Redshift vs. Redshift Spectrum

At this point, you might be asking yourself, why would I ever not use Redshift Spectrum? Well, you still get additional value for your money by loading data into Amazon Redshift, and querying in Amazon Redshift vs. querying S3.

In fact, it turns out that the last version of our query runs even faster when executed exclusively in native Amazon Redshift, as shown in the following chart:

Chart comparing Amazon Redshift vs. Redshift Spectrum with pushdown aggregation over 3 months of data

As a general rule, queries that aren’t dominated by I/O and which involve multiple joins are better optimized in native Amazon Redshift. For instance, the performance difference between running the partition key query entirely in Amazon Redshift versus with Redshift Spectrum is twice as large as that that of the pushdown aggregation query, partly because the former case benefits more from better join performance.

Furthermore, the variability in latency in native Amazon Redshift is lower. For use cases where you have tight performance SLAs on queries, you may want to consider using Amazon Redshift exclusively to support those queries.

On the other hand, when you perform large scans, you could benefit from the best of both worlds: higher performance at lower cost. For instance, imagine that you wanted to enable your business analysts to interactively discover insights across a vast amount of historical data. In the example below, the pushdown aggregation query is modified to analyze seven years of data instead of three months:

SELECT c.c_name, c.c_mktsegment, t.prettyMonthYear, uv.totalRevenue
…
WHERE customer <= 3 and visitYearMonth >= 199201
… 
FROM dwdate WHERE d_yearmonthnum >= 199201) as t
…
ORDER BY c.c_name, c.c_mktsegment, uv.visitYearMonth ASC

This query requires scanning and aggregating nearly 1.9 billion records. As shown in the chart below, Redshift Spectrum substantially speeds up this query. A large Amazon Redshift cluster would have to be provisioned to support this use case. With the aid of Redshift Spectrum, you could use an existing small cluster, keep a single copy of your data in S3, and benefit from economical, durable storage while only paying for what you use via the pay per query pricing model.

Chart comparing Amazon Redshift vs. Redshift Spectrum with pushdown aggregation over 7 years of data

Summary

Redshift Spectrum lowers the time to value for deeper insights on customer data queries spanning the data lake and data warehouse. It can enable interactive analysis on datasets in cases that weren’t economically practical or technically feasible before.

There are cases where you can get the best of both worlds from Redshift Spectrum: higher performance at lower cost. However, there are still latency-sensitive use cases where you may want native Amazon Redshift performance. For more best practice tips, see the 10 Best Practices for Amazon Redshift post.

Please visit the Amazon Redshift Spectrum PoC Environment Github page. If you have questions or suggestions, please comment below.

 


Additional Reading

Learn more about how Amazon Redshift Spectrum extends data warehousing out to exabytes – no loading required.


About the Author

Dylan Tong is an Enterprise Solutions Architect at AWS. He works with customers to help drive their success on the AWS platform through thought leadership and guidance on designing well architected solutions. He has spent most of his career building on his expertise in data management and analytics by working for leaders and innovators in the space.

 

 

AWS Summit New York – Summary of Announcements

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/aws-summit-new-york-summary-of-announcements/

Whew – what a week! Tara, Randall, Ana, and I have been working around the clock to create blog posts for the announcements that we made at the AWS Summit in New York. Here’s a summary to help you to get started:

Amazon Macie – This new service helps you to discover, classify, and secure content at scale. Powered by machine learning and making use of Natural Language Processing (NLP), Macie looks for patterns and alerts you to suspicious behavior, and can help you with governance, compliance, and auditing. You can read Tara’s post to see how to put Macie to work; you select the buckets of interest, customize the classification settings, and review the results in the Macie Dashboard.

AWS GlueRandall’s post (with deluxe animated GIFs) introduces you to this new extract, transform, and load (ETL) service. Glue is serverless and fully managed, As you can see from the post, Glue crawls your data, infers schemas, and generates ETL scripts in Python. You define jobs that move data from place to place, with a wide selection of transforms, each expressed as code and stored in human-readable form. Glue uses Development Endpoints and notebooks to provide you with a testing environment for the scripts you build. We also announced that Amazon Athena now integrates with Amazon Glue, as does Apache Spark and Hive on Amazon EMR.

AWS Migration Hub – This new service will help you to migrate your application portfolio to AWS. My post outlines the major steps and shows you how the Migration Hub accelerates, tracks,and simplifies your migration effort. You can begin with a discovery step, or you can jump right in and migrate directly. Migration Hub integrates with tools from our migration partners and builds upon the Server Migration Service and the Database Migration Service.

CloudHSM Update – We made a major upgrade to AWS CloudHSM, making the benefits of hardware-based key management available to a wider audience. The service is offered on a pay-as-you-go basis, and is fully managed. It is open and standards compliant, with support for multiple APIs, programming languages, and cryptography extensions. CloudHSM is an integral part of AWS and can be accessed from the AWS Management Console, AWS Command Line Interface (CLI), and through API calls. Read my post to learn more and to see how to set up a CloudHSM cluster.

Managed Rules to Secure S3 Buckets – We added two new rules to AWS Config that will help you to secure your S3 buckets. The s3-bucket-public-write-prohibited rule identifies buckets that have public write access and the s3-bucket-public-read-prohibited rule identifies buckets that have global read access. As I noted in my post, you can run these rules in response to configuration changes or on a schedule. The rules make use of some leading-edge constraint solving techniques, as part of a larger effort to use automated formal reasoning about AWS.

CloudTrail for All Customers – Tara’s post revealed that AWS CloudTrail is now available and enabled by default for all AWS customers. As a bonus, Tara reviewed the principal benefits of CloudTrail and showed you how to review your event history and to deep-dive on a single event. She also showed you how to create a second trail, for use with CloudWatch CloudWatch Events.

Encryption of Data at Rest for EFS – When you create a new file system, you now have the option to select a key that will be used to encrypt the contents of the files on the file system. The encryption is done using an industry-standard AES-256 algorithm. My post shows you how to select a key and to verify that it is being used.

Watch the Keynote
My colleagues Adrian Cockcroft and Matt Wood talked about these services and others on the stage, and also invited some AWS customers to share their stories. Here’s the video:

Jeff;

 

Turbocharge your Apache Hive queries on Amazon EMR using LLAP

Post Syndicated from Jigar Mistry original https://aws.amazon.com/blogs/big-data/turbocharge-your-apache-hive-queries-on-amazon-emr-using-llap/

Apache Hive is one of the most popular tools for analyzing large datasets stored in a Hadoop cluster using SQL. Data analysts and scientists use Hive to query, summarize, explore, and analyze big data.

With the introduction of Hive LLAP (Low Latency Analytical Processing), the notion of Hive being just a batch processing tool has changed. LLAP uses long-lived daemons with intelligent in-memory caching to circumvent batch-oriented latency and provide sub-second query response times.

This post provides an overview of Hive LLAP, including its architecture and common use cases for boosting query performance. You will learn how to install and configure Hive LLAP on an Amazon EMR cluster and run queries on LLAP daemons.

What is Hive LLAP?

Hive LLAP was introduced in Apache Hive 2.0, which provides very fast processing of queries. It uses persistent daemons that are deployed on a Hadoop YARN cluster using Apache Slider. These daemons are long-running and provide functionality such as I/O with DataNode, in-memory caching, query processing, and fine-grained access control. And since the daemons are always running in the cluster, it saves substantial overhead of launching new YARN containers for every new Hive session, thereby avoiding long startup times.

When Hive is configured in hybrid execution mode, small and short queries execute directly on LLAP daemons. Heavy lifting (like large shuffles in the reduce stage) is performed in YARN containers that belong to the application. Resources (CPU, memory, etc.) are obtained in a traditional fashion using YARN. After the resources are obtained, the execution engine can decide which resources are to be allocated to LLAP, or it can launch Apache Tez processors in separate YARN containers. You can also configure Hive to run all the processing workloads on LLAP daemons for querying small datasets at lightning fast speeds.

LLAP daemons are launched under YARN management to ensure that the nodes don’t get overloaded with the compute resources of these daemons. You can use scheduling queues to make sure that there is enough compute capacity for other YARN applications to run.

Why use Hive LLAP?

With many options available in the market (Presto, Spark SQL, etc.) for doing interactive SQL  over data that is stored in Amazon S3 and HDFS, there are several reasons why using Hive and LLAP might be a good choice:

  • For those who are heavily invested in the Hive ecosystem and have external BI tools that connect to Hive over JDBC/ODBC connections, LLAP plugs in to their existing architecture without a steep learning curve.
  • It’s compatible with existing Hive SQL and other Hive tools, like HiveServer2, and JDBC drivers for Hive.
  • It has native support for security features with authentication and authorization (SQL standards-based authorization) using HiveServer2.
  • LLAP daemons are aware about of the columns and records that are being processed which enables you to enforce fine-grained access control.
  • It can use Hive’s vectorization capabilities to speed up queries, and Hive has better support for Parquet file format when vectorization is enabled.
  • It can take advantage of a number of Hive optimizations like merging multiple small files for query results, automatically determining the number of reducers for joins and groupbys, etc.
  • It’s optional and modular so it can be turned on or off depending on the compute and resource requirements of the cluster. This lets you to run other YARN applications concurrently without reserving a cluster specifically for LLAP.

How do you install Hive LLAP in Amazon EMR?

To install and configure LLAP on an EMR cluster, use the following bootstrap action (BA):

s3://aws-bigdata-blog/artifacts/Turbocharge_Apache_Hive_on_EMR/configure-Hive-LLAP.sh

This BA downloads and installs Apache Slider on the cluster and configures LLAP so that it works with EMR Hive. For LLAP to work, the EMR cluster must have Hive, Tez, and Apache Zookeeper installed.

You can pass the following arguments to the BA.

Argument Definition Default value
--instances Number of instances of LLAP daemon Number of core/task nodes of the cluster
--cache Cache size per instance 20% of physical memory of the node
--executors Number of executors per instance Number of CPU cores of the node
--iothreads Number of IO threads per instance Number of CPU cores of the node
--size Container size per instance 50% of physical memory of the node
--xmx Working memory size 50% of container size
--log-level Log levels for the LLAP instance INFO

LLAP example

This section describes how you can try the faster Hive queries with LLAP using the TPC-DS testbench for Hive on Amazon EMR.

Use the following AWS command line interface (AWS CLI) command to launch a 1+3 nodes m4.xlarge EMR 5.6.0 cluster with the bootstrap action to install LLAP:

aws emr create-cluster --release-label emr-5.6.0 \
--applications Name=Hadoop Name=Hive Name=Hue Name=ZooKeeper Name=Tez \
--bootstrap-actions '[{"Path":"s3://aws-bigdata-blog/artifacts/Turbocharge_Apache_Hive_on_EMR/configure-Hive-LLAP.sh","Name":"Custom action"}]' \ 
--ec2-attributes '{"KeyName":"<YOUR-KEY-PAIR>","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-xxxxxxxx","EmrManagedSlaveSecurityGroup":"sg-xxxxxxxx","EmrManagedMasterSecurityGroup":"sg-xxxxxxxx"}' 
--service-role EMR_DefaultRole \
--enable-debugging \
--log-uri 's3n://<YOUR-BUCKET/' --name 'test-hive-llap' \
--instance-groups '[{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":1}],"EbsOptimized":true},"InstanceGroupType":"MASTER","InstanceType":"m4.xlarge","Name":"Master - 1"},{"InstanceCount":3,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":1}],"EbsOptimized":true},"InstanceGroupType":"CORE","InstanceType":"m4.xlarge","Name":"Core - 2"}]' 
--region us-east-1

After the cluster is launched, log in to the master node using SSH, and do the following:

  1. Open the hive-tpcds folder:
    cd /home/hadoop/hive-tpcds/
  2. Start Hive CLI using the testbench configuration, create the required tables, and run the sample query:

    hive –i testbench.settings
    hive> source create_tables.sql;
    hive> source query55.sql;

    This sample query runs on a 40 GB dataset that is stored on Amazon S3. The dataset is generated using the data generation tool in the TPC-DS testbench for Hive.It results in output like the following:
  3. This screenshot shows that the query finished in about 47 seconds for LLAP mode. Now, to compare this to the execution time without LLAP, you can run the same workload using only Tez containers:
    hive> set hive.llap.execution.mode=none;
    hive> source query55.sql;


    This query finished in about 80 seconds.

The difference in query execution time is almost 1.7 times when using just YARN containers in contrast to running the query on LLAP daemons. And with every rerun of the query, you notice that the execution time substantially decreases by the virtue of in-memory caching by LLAP daemons.

Conclusion

In this post, I introduced Hive LLAP as a way to boost Hive query performance. I discussed its architecture and described several use cases for the component. I showed how you can install and configure Hive LLAP on an Amazon EMR cluster and how you can run queries on LLAP daemons.

If you have questions about using Hive LLAP on Amazon EMR or would like to share your use cases, please leave a comment below.


Additional Reading

Learn how to to automatically partition Hive external tables with AWS.


About the Author

Jigar Mistry is a Hadoop Systems Engineer with Amazon Web Services. He works with customers to provide them architectural guidance and technical support for processing large datasets in the cloud using open-source applications. In his spare time, he enjoys going for camping and exploring different restaurants in the Seattle area.