All posts by Roy Hasson

Separating queries and managing costs using Amazon Athena workgroups

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/separating-queries-and-managing-costs-using-amazon-athena-workgroups/

Amazon Athena is a serverless query engine for data on Amazon S3. Many customers use Athena to query application and service logs, schedule automated reports, and integrate with their applications, enabling new analytics-based capabilities.

Different types of users rely on Athena, including business analysts, data scientists, security, and operations engineers. But how do you separate and manage these workloads so that users get the best experience while minimizing costs?

In this post, I show you how to use workgroups to do the following:

  • Separate workloads.
  • Control user access.
  • Manage query usage and costs.

Separate workloads

By default, all Athena queries execute in the primary workgroup.  As an administrator, you can create new workgroups to separate different types of workloads.  Administrators commonly turn to workgroups to separate analysts running ad hoc queries from automated reports.  Here’s how to build out that separation.

First create two workgroups, one for ad hoc users (ad-hoc-users) and another for automated reports (reporting).

Next, select a specific output location. All queries executed inside this workgroup save their results to this output location. Routing results to a single secure location helps make sure users only access data they are permitted to see. You can also enforce encryption of query results in S3 by selecting the appropriate encryption configuration.

Workgroups also help you simplify the onboarding of new users to Athena. By selecting override client-side settings, you enforce a predefined configuration on all queries within a workgroup. Users no longer have to configure a query results output location or S3 encryption keys. These settings default to the parameters defined for the workgroup where those queries execute. Additionally, each workgroup maintains a unique query history and saved query inventory, making queries easier for you to track down.

Finally, when creating a workgroup, you can add up to 50 key-value pair tags to help identify your workgroup resources. Tags are also useful when attempting to allocate Athena costs between groups of users. Create Name and Dept tags for the ad-hoc-users and reporting workgroups with their name and department association.

Control user access to workgroups

Now that you have two workgroups defined, ad-hoc-users and reporting, you must control who can use and update them.  Remember that workgroups are IAM resources and therefore have an ARN. You can use this ARN in the IAM policy that you associate with your users.  In this example, create a single IAM user representing the team of ad hoc users and add the individual to an IAM group. The group contains a policy that enforces what actions these users can perform.

Start by reviewing IAM Policies for Accessing Workgroups and Workgroup Example Policies to familiarize yourself with policy options. Use the following IAM policy to set up permissions for your analyst user. Grant this user only the permissions required for working in the ad-hoc-users workgroup. Make sure that you tweak this policy to match your exact needs:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "athena:ListWorkGroups"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "athena:StartQueryExecution",
                "athena:GetQueryResults",
                "athena:DeleteNamedQuery",
                "athena:GetNamedQuery",
                "athena:ListQueryExecutions",
                "athena:StopQueryExecution",
                "athena:GetQueryResultsStream",
                "athena:GetQueryExecutions",
                "athena:ListNamedQueries",
                "athena:CreateNamedQuery",
                "athena:GetQueryExecution",
                "athena:BatchGetNamedQuery",
                "athena:BatchGetQueryExecution",
                "athena:GetWorkGroup",
                "athena:ListTagsForResource"
            ],
            "Resource": "arn:aws:athena:us-east-1:112233445566:workgroup/ad-hoc-users"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObjectAcl",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListMultipartUploadParts"
            ],
            "Resource": "arn:aws:s3:::demo/workgroups/adhocusers/*"
        },
{
            "Effect": "Allow",
            "Action": [
                "glue:Get*"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:112233445566:catalog",
                "arn:aws:glue:us-east-1:112233445566:database/amazon",
                "arn:aws:glue:us-east-1:112233445566:table/amazon/*"
            ]
        }
    ]
}

Now your analyst user can execute queries only in the ad-hoc-users workgroup. The analyst user can switch to other workgroups, but they lose access when they try to perform any action. They are further restricted to list and query only those tables that belong to the Amazon database. For more information about controlling access to AWS Glue resources such as databases and tables, see AWS Glue Resource Policies for Access Control.

The following screenshot shows what the analyst user sees in the Athena console:

I’ve created a simple Node.js tool that executes SQL queries stored as files in a given directory. You can find my Athena test runner code in the athena_test_runner GitHub repo. You can use this code to simulate a reporting tool, after configuring it to use a workgroup. To do that, create an IAM role with permissions like those previously defined for the analyst user. This time, restrict access to the reporting workgroup.

The following JavaScript code example shows how to select a workgroup programmatically when executing queries:

function executeQueries(files) {
    params = 
    {
      "QueryString": "", 
      "ResultConfiguration": { 
        "OutputLocation": ""
      },
      "QueryExecutionContext": {
        "Database": "default"
      },
      "WorkGroup":"reporting"
    }
 
    params.QueryString = "SELECT * FROM amazon.final_parquet LIMIT 10"
    return new Promise((resolve, reject) => {
        athena.startQueryExecution(params, (err, results) => {
            if (err) {
                reject(err.message)
            } else {
                resolve(results)
            }
        })
    })
}

Run sample automated reports under the reporting workgroup, with the following command:

node index.js testsuite/

Query histories remain isolated between workgroups. A user logging into the Athena console as an analyst using the ad-hoc-users workgroup doesn’t see any automated reports that you ran under the reporting workgroup.

Managing query usage and cost

You have two workgroups configured: one for ad hoc users and another for automated reports. Now, you must safeguard against bad queries. In this use case, two potential situations for query usage should be monitored and controlled:

  • Make sure that users don’t run queries that scan more data than allowed by their budget.
  • Safeguard against automated script bugs that could cause indefinite query retirement.

First, configure data usage controls for your ad-hoc-users workgroup. There are two types of data usage controls: per-query and per-workgroup.

Set the per-query control for analysts to be 1 GB. This control cancels any query run in the ad-hoc-users workgroup that tries to scan more than 1 GB.

To observe this limit in action, choose Update, return to the query editor, and run a query that would scan more than 1 GB. This query triggers the error message, “Query cancelled! : Bytes scanned limit was exceeded”. Remember that you incur charges for data the query scanned up to the point of cancellation. In this case, you incur charges for 1 GB of data.

Now, switch to your reporting workgroup. For this workload, you’re not worried about individual queries scanning too much data. However, you want to control the aggregate amount of data scanned of all queries in this workgroup.

Create a per-workload data usage control for the reporting workgroup. You can configure the maximum amount of data scanned by all queries in the workgroup during a specific period.

For the automated reporting workload, you probably have a good idea of how long the process should take and the total amount of data that queries scan during this time. You only have a few reports to run, so you can expect them to run in a few minutes, only scanning a few megabytes of data. Begin by setting up a low watermark alarm to notify you when your queries have scanned more data than you would expect in five minutes. The following example is for demo purposes only. In most cases, this period would be longer. I configured the alarm to send a notification to an Amazon SNS topic that I created.

To validate the alarm, I made a minor change to my test queries, causing them to scan more data. This change triggered the SNS alarm, shown in the following Amazon CloudWatch dashboard:

Next, create a high watermark alarm that is triggered when the queries in your reporting workgroup exceed 1 GB of data over 15 minutes. In this case, the alarm triggers an AWS Lambda function that disables the workgroup, making sure that no additional queries execute in it. This alarm protects you from incurring faulty automation code or runaway query costs.

Before creating the data usage control, create a Node.js Lambda function to disable the workgroup. Paste in the following code:

exports.handler = async (event) => {
    const AWS = require('aws-sdk')
    let athena = new AWS.Athena({region: 'us-east-1'})
 
    let msg = JSON.parse(event.Records[0].Sns.Message)
    let wgname = msg.Trigger.Dimensions.filter((i)=>i.name=='WorkGroup')[0].value
    
    athena.updateWorkGroup({WorkGroup: wgname, State: 'DISABLED'})
 
    const response = {
        statusCode: 200,
        body: JSON.stringify(`Workgroup ${wgname} has been disabled`),
    };
    return response;
}

This code grabs the workgroup name from the SNS message body and calls the UpdateWorkGroup API action with the name and the state of DISABLED. The Athena API requires the most recent version of the AWS SDK. When you create the Lambda bundle, include the latest AWS SDK version in that bundle.

Next, create a new SNS topic and a subscription. For Protocol, select AWS Lambda. Then, select the Lambda function that you created in the previous step.

In the Athena console, create the second alarm, 1 GB for 15 min., and point it to the SNS topic that you created earlier. When triggered, this SNS topic calls the Lambda function that disables the reporting workgroup. No more queries can execute in this workgroup. You see this error message in the console when a workgroup is disabled:

Athena exposes other aggregated metrics per workgroup under the AWS/Athena namespace in CloudWatch, such as the query status and the query type (DDL or DML) per workgroup. To learn more, see Monitoring Athena Queries with CloudWatch Metrics.

Cost allocation tags

When you created your ad-hoc-users and reporting workgroups, you added Name and Dept tags. These tags can be used in your Billing and Cost Management console to determine the usage per workgroup.

Summary

In this post, you learned how to use workgroups in Athena to isolate different query workloads, manage access, and define data usage controls to protect yourself from runaway queries. Metrics exposed to CloudWatch help you monitor query performance and make sure that your users are getting the best experience possible. For more details, see Using Workgroups to Control Query Access.

About the Author

Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan, cheering his team on and hanging out with his family.

 

 

 

Using CTAS statements with Amazon Athena to reduce cost and improve performance

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/using-ctas-statements-with-amazon-athena-to-reduce-cost-and-improve-performance/

Amazon Athena is an interactive query service that makes it more efficient to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Athena recently released support for creating tables using the results of a SELECT query or CREATE TABLE AS SELECT (CTAS) statement. Analysts can use CTAS statements to create new tables from existing tables on a subset of data, or a subset of columns. They also have options to convert the data into columnar formats, such as Apache Parquet and Apache ORC, and partition it. Athena automatically adds the resultant table and partitions to the AWS Glue Data Catalog, making them immediately available for subsequent queries.

CTAS statements help reduce cost and improve performance by allowing users to run queries on smaller tables constructed from larger tables. This post covers three use cases that demonstrate the benefit of using CTAS to create a new dataset, smaller than the original one, allowing subsequent queries to run faster. Assuming our use case requires repeatedly querying the data, we can now query a smaller and more optimal dataset to get the results faster.

Using Amazon Athena CTAS

The familiar CREATE TABLE statement creates an empty table. In contrast, the CTAS statement creates a new table containing the result of a SELECT query. The new table’s metadata is automatically added to the AWS Glue Data Catalog. The data files are stored in Amazon S3 at the designated location. When creating new tables using CTAS, you can include a WITH statement to define table-specific parameters, such as file format, compression, and partition columns. For more information about the parameters you can use, see Creating a Table from Query Results (CTAS).

Before you begin: Set up CloudTrail for querying with Athena

If you don’t already use Athena to query your AWS CloudTrail data, we recommend you set this up. To do so:

  1. Open the CloudTrail console.
  2. On the left side of the console, choose Event History.
  3. At the top of the window, choose Run advanced queries in Amazon Athena.
  4. Follow the setup wizard and create your Athena table.

It takes some time for data to collect. If this is your first time, it takes about an hour to get meaningful data. This assumes that there is activity in your AWS account.

This post assumes that your CloudTrail table is named cloudtrail_logs, and that it resides in the default database.

Use Case 1: optimizing for repeated queries by reducing dataset size

As with other AWS services, Athena uses AWS CloudTrail to track its API calls. In this use case, we use CloudTrail to provide an insight into our Athena usage. CloudTrail automatically publishes data in JSON format to S3. We use a CTAS statement to create a table with only 30 days of Athena API events, to remove all of the other API events that we don’t care about.  This reduces the table size, which improves subsequent queries.

The following query uses the last 30 days of Athena events. It creates a new table called “athena_30_days” and saves the data files in Parquet format.

CREATE TABLE athena_30_days
AS
SELECT
  date(from_iso8601_timestamp(eventtime)) AS dt,
  *
FROM cloudtrail_logs
WHERE eventsource = 'athena.amazonaws.com'
AND 
  date(from_iso8601_timestamp(eventtime)) 
    BETWEEN current_date - interval '30' day AND current_date

Executing this query on the original CloudTrail data takes close to 5 minutes to run, and scans around 14 GB of data. This is because the raw data is in JSON format and not well partitioned.  Executing a SELECT * on the newly created table now takes 1.7 seconds and scans 1.14MB of data.

Now you can run multiple queries or build a dashboard on the reduced dataset.

For example, the following query aggregates the total count of each Athena API, grouping results by IAM user, date, and API event name.  This query took only 1.8 seconds to complete.

SELECT 
  dt, 
  eventname,
  count(eventname) as event_count,
  split_part(useridentity.arn, ':', 6) as user
FROM athena_30_days
GROUP BY 1,2,4
ORDER BY event_count DESC

Use case 2: Selecting a smaller number of columns

In this use case, I join the CloudTrail table with the S3 Inventory table while only selecting specific columns relevant to my analysis.  I use CTAS to generate a table from the results.

CREATE TABLE athena_s3_30_days
AS
SELECT 
  json_extract_scalar(ct.requestparameters, '$.bucketName') AS bucket,
  json_extract_scalar(ct.requestparameters, '$.key') AS key,
  ct.useridentity.username AS username,
  ct.eventname,
  cast (from_iso8601_timestamp(ct.eventtime) as timestamp) as ts,
  s3.storage_class,
  s3.size
FROM cloudtrail_logs ct
JOIN s3inventory s3 
ON json_extract_scalar(ct.requestparameters, '$.bucketName') = s3.bucket
AND json_extract_scalar(ct.requestparameters, '$.key') = s3.key
AND date(from_iso8601_timestamp(ct.eventtime)) = date(s3.last_modified_date)
WHERE ct.eventsource = 's3.amazonaws.com' 
AND ct.eventname = 'GetObject'
AND ct.useridentity.invokedby LIKE '%athena%'
AND date(from_iso8601_timestamp(eventtime)) 
    BETWEEN current_date - interval '30' day AND current_date

The previous query example returns the last 30 days of S3 GetObject API events that were invoked by the Athena service.  It adds the S3 object size and storage class for each event returned from the S3 Inventory table.

We can then, for example, count the number of times each key has been accessed by Athena, ordering the results based on the count from small to large.  This provides us an indication of the size of files we’re scanning and how often. Knowing this helps us determine if we should optimize by performing compaction on those keys.

SELECT
  bucket,
  size,
  key,
  count(key) AS key_count
FROM athena_s3_30_days
GROUP BY 1,2,3
ORDER BY key_count DESC

In the case of my example, it looks like this:

Use case 3: Repartitioning an existing table

The third use case I want to highlight where CTAS can be of value is taking an existing unoptimized dataset, converting it to Apache ORC and partitioning it to better optimize for repeated queries.  We’ll take the last 100 days of CloudTrail events and partition it by date.

CREATE TABLE cloudtrail_partitioned
WITH (
  partitioned_by = ARRAY['year', 'month'],
  format = 'orc',
  external_location = 's3://royon-demo/cloudtrail_partitioned'
)
AS
SELECT
  *, 
  year(date(from_iso8601_timestamp(eventtime))) as year,
  month(date(from_iso8601_timestamp(eventtime))) as month
FROM cloudtrail_logs

Notice that I’ve added a WITH clause following the CREATE TABLE keywords but before the AS keyword.  Within the WITH clause, we can define the table properties that we want.  In this particular case, we declared “year” and “month” as our partitioning columns and defined ORC as the output format.  The reason I used ORC is because CloudTrail data may contain empty columns that are not allowed by the Parquet specification, but are allowed by ORC.  Additionally, I defined the external S3 location to store our table.  If we don’t define an external location, Athena uses the default query result S3 location.

The resulting S3 destination bucket looks similar to the following example:

An additional optimization supported by Athena CTAS is bucketing.  Partitioning is used to group similar types of data based on a specific column.  Bucketing is commonly used to combine data within a partition into a number of equal groups, or files.  Therefore, partitioning is best suited for low cardinality columns and bucketing is best suited for high cardinality columns.  For more information, see Bucketing vs Partitioning.

Let’s take the previous CTAS example and add bucketing.

CREATE TABLE cloudtrail_partitioned_bucketed
WITH (
  partitioned_by = ARRAY['year', 'month'],
  bucketed_by = ARRAY['eventname'],
  bucket_count = 3,
  format = 'orc',
  external_location = 's3://royon-demo/cloudtrail_partitioned'
)
AS
SELECT
  *, 
  year(date(from_iso8601_timestamp(eventtime))) as year,
  month(date(from_iso8601_timestamp(eventtime))) as month
FROM cloudtrail_logs

 

And this is what it looks like in S3:

Here is an example query on both a partitioned table and a partitioned and bucketed table.  You can see that the speed is similar, but that the bucketed query scans less data.

Partitioned table:

Partitioned and bucketed table:

Conclusion

In this post, we introduced CREATE TABLE AS SELECT (CTAS) in Amazon Athena. CTAS lets you create a new table from the result of a SELECT query. The new table can be stored in Parquet, ORC, Avro, JSON, and TEXTFILE formats. Additionally, the new table can be partitioned and bucketed for improved performance. We looked at how CTAS helps with three common use cases:

  1. Reducing a large dataset into a smaller, more efficient dataset.
  2. Selecting a subset of the columns and rows to only deliver what the consumer of the data really needs.
  3. Partitioning and bucketing a dataset that is not currently optimized to improve performance and reduce the cost.

Additional Reading

If you found this post useful, be sure to check out How Realtor.com Monitors Amazon Athena Usage with AWS CloudTrail and Amazon QuickSight.

 


About the Author

Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan cheering his team on and hanging out with his family.

 

 

 

Analyze Apache Parquet optimized data using Amazon Kinesis Data Firehose, Amazon Athena, and Amazon Redshift

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/analyzing-apache-parquet-optimized-data-using-amazon-kinesis-data-firehose-amazon-athena-and-amazon-redshift/

Amazon Kinesis Data Firehose is the easiest way to capture and stream data into a data lake built on Amazon S3. This data can be anything—from AWS service logs like AWS CloudTrail log files, Amazon VPC Flow Logs, Application Load Balancer logs, and others. It can also be IoT events, game events, and much more. To efficiently query this data, a time-consuming ETL (extract, transform, and load) process is required to massage and convert the data to an optimal file format, which increases the time to insight. This situation is less than ideal, especially for real-time data that loses its value over time.

To solve this common challenge, Kinesis Data Firehose can now save data to Amazon S3 in Apache Parquet or Apache ORC format. These are optimized columnar formats that are highly recommended for best performance and cost-savings when querying data in S3. This feature directly benefits you if you use Amazon Athena, Amazon Redshift, AWS Glue, Amazon EMR, or any other big data tools that are available from the AWS Partner Network and through the open-source community.

Amazon Connect is a simple-to-use, cloud-based contact center service that makes it easy for any business to provide a great customer experience at a lower cost than common alternatives. Its open platform design enables easy integration with other systems. One of those systems is Amazon Kinesis—in particular, Kinesis Data Streams and Kinesis Data Firehose.

What’s really exciting is that you can now save events from Amazon Connect to S3 in Apache Parquet format. You can then perform analytics using Amazon Athena and Amazon Redshift Spectrum in real time, taking advantage of this key performance and cost optimization. Of course, Amazon Connect is only one example. This new capability opens the door for a great deal of opportunity, especially as organizations continue to build their data lakes.

Amazon Connect includes an array of analytics views in the Administrator dashboard. But you might want to run other types of analysis. In this post, I describe how to set up a data stream from Amazon Connect through Kinesis Data Streams and Kinesis Data Firehose and out to S3, and then perform analytics using Athena and Amazon Redshift Spectrum. I focus primarily on the Kinesis Data Firehose support for Parquet and its integration with the AWS Glue Data Catalog, Amazon Athena, and Amazon Redshift.

Solution overview

Here is how the solution is laid out:

 

 

The following sections walk you through each of these steps to set up the pipeline.

1. Define the schema

When Kinesis Data Firehose processes incoming events and converts the data to Parquet, it needs to know which schema to apply. The reason is that many times, incoming events contain all or some of the expected fields based on which values the producers are advertising. A typical process is to normalize the schema during a batch ETL job so that you end up with a consistent schema that can easily be understood and queried. Doing this introduces latency due to the nature of the batch process. To overcome this issue, Kinesis Data Firehose requires the schema to be defined in advance.

To see the available columns and structures, see Amazon Connect Agent Event Streams. For the purpose of simplicity, I opted to make all the columns of type String rather than create the nested structures. But you can definitely do that if you want.

The simplest way to define the schema is to create a table in the Amazon Athena console. Open the Athena console, and paste the following create table statement, substituting your own S3 bucket and prefix for where your event data will be stored. A Data Catalog database is a logical container that holds the different tables that you can create. The default database name shown here should already exist. If it doesn’t, you can create it or use another database that you’ve already created.

CREATE EXTERNAL TABLE default.kfhconnectblog (
  awsaccountid string,
  agentarn string,
  currentagentsnapshot string,
  eventid string,
  eventtimestamp string,
  eventtype string,
  instancearn string,
  previousagentsnapshot string,
  version string
)
STORED AS parquet
LOCATION 's3://your_bucket/kfhconnectblog/'
TBLPROPERTIES ("parquet.compression"="SNAPPY")

That’s all you have to do to prepare the schema for Kinesis Data Firehose.

2. Define the data streams

Next, you need to define the Kinesis data streams that will be used to stream the Amazon Connect events.  Open the Kinesis Data Streams console and create two streams.  You can configure them with only one shard each because you don’t have a lot of data right now.

3. Define the Kinesis Data Firehose delivery stream for Parquet

Let’s configure the Data Firehose delivery stream using the data stream as the source and Amazon S3 as the output. Start by opening the Kinesis Data Firehose console and creating a new data delivery stream. Give it a name, and associate it with the Kinesis data stream that you created in Step 2.

As shown in the following screenshot, enable Record format conversion (1) and choose Apache Parquet (2). As you can see, Apache ORC is also supported. Scroll down and provide the AWS Glue Data Catalog database name (3) and table names (4) that you created in Step 1. Choose Next.

To make things easier, the output S3 bucket and prefix fields are automatically populated using the values that you defined in the LOCATION parameter of the create table statement from Step 1. Pretty cool. Additionally, you have the option to save the raw events into another location as defined in the Source record S3 backup section. Don’t forget to add a trailing forward slash “ / “ so that Data Firehose creates the date partitions inside that prefix.

On the next page, in the S3 buffer conditions section, there is a note about configuring a large buffer size. The Parquet file format is highly efficient in how it stores and compresses data. Increasing the buffer size allows you to pack more rows into each output file, which is preferred and gives you the most benefit from Parquet.

Compression using Snappy is automatically enabled for both Parquet and ORC. You can modify the compression algorithm by using the Kinesis Data Firehose API and update the OutputFormatConfiguration.

Be sure to also enable Amazon CloudWatch Logs so that you can debug any issues that you might run into.

Lastly, finalize the creation of the Firehose delivery stream, and continue on to the next section.

4. Set up the Amazon Connect contact center

After setting up the Kinesis pipeline, you now need to set up a simple contact center in Amazon Connect. The Getting Started page provides clear instructions on how to set up your environment, acquire a phone number, and create an agent to accept calls.

After setting up the contact center, in the Amazon Connect console, choose your Instance Alias, and then choose Data Streaming. Under Agent Event, choose the Kinesis data stream that you created in Step 2, and then choose Save.

At this point, your pipeline is complete.  Agent events from Amazon Connect are generated as agents go about their day. Events are sent via Kinesis Data Streams to Kinesis Data Firehose, which converts the event data from JSON to Parquet and stores it in S3. Athena and Amazon Redshift Spectrum can simply query the data without any additional work.

So let’s generate some data. Go back into the Administrator console for your Amazon Connect contact center, and create an agent to handle incoming calls. In this example, I creatively named mine Agent One. After it is created, Agent One can get to work and log into their console and set their availability to Available so that they are ready to receive calls.

To make the data a bit more interesting, I also created a second agent, Agent Two. I then made some incoming and outgoing calls and caused some failures to occur, so I now have enough data available to analyze.

5. Analyze the data with Athena

Let’s open the Athena console and run some queries. One thing you’ll notice is that when we created the schema for the dataset, we defined some of the fields as Strings even though in the documentation they were complex structures.  The reason for doing that was simply to show some of the flexibility of Athena to be able to parse JSON data. However, you can define nested structures in your table schema so that Kinesis Data Firehose applies the appropriate schema to the Parquet file.

Let’s run the first query to see which agents have logged into the system.

The query might look complex, but it’s fairly straightforward:

WITH dataset AS (
  SELECT 
    from_iso8601_timestamp(eventtimestamp) AS event_ts,
    eventtype,
    -- CURRENT STATE
    json_extract_scalar(
      currentagentsnapshot,
      '$.agentstatus.name') AS current_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        currentagentsnapshot,
        '$.agentstatus.starttimestamp')) AS current_starttimestamp,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.firstname') AS current_firstname,
    json_extract_scalar(
      currentagentsnapshot,
      '$.configuration.lastname') AS current_lastname,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.username') AS current_username,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') AS               current_outboundqueue,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as current_inboundqueue,
    -- PREVIOUS STATE
    json_extract_scalar(
      previousagentsnapshot, 
      '$.agentstatus.name') as prev_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        previousagentsnapshot, 
       '$.agentstatus.starttimestamp')) as prev_starttimestamp,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.firstname') as prev_firstname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.lastname') as prev_lastname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.username') as prev_username,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') as current_outboundqueue,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as prev_inboundqueue
  from kfhconnectblog
  where eventtype <> 'HEART_BEAT'
)
SELECT
  current_status as status,
  current_username as username,
  event_ts
FROM dataset
WHERE eventtype = 'LOGIN' AND current_username <> ''
ORDER BY event_ts DESC

The query output looks something like this:

Here is another query that shows the sessions each of the agents engaged with. It tells us where they were incoming or outgoing, if they were completed, and where there were missed or failed calls.

WITH src AS (
  SELECT
     eventid,
     json_extract_scalar(currentagentsnapshot, '$.configuration.username') as username,
     cast(json_extract(currentagentsnapshot, '$.contacts') AS ARRAY(JSON)) as c,
     cast(json_extract(previousagentsnapshot, '$.contacts') AS ARRAY(JSON)) as p
  from kfhconnectblog
),
src2 AS (
  SELECT *
  FROM src CROSS JOIN UNNEST (c, p) AS contacts(c_item, p_item)
),
dataset AS (
SELECT 
  eventid,
  username,
  json_extract_scalar(c_item, '$.contactid') as c_contactid,
  json_extract_scalar(c_item, '$.channel') as c_channel,
  json_extract_scalar(c_item, '$.initiationmethod') as c_direction,
  json_extract_scalar(c_item, '$.queue.name') as c_queue,
  json_extract_scalar(c_item, '$.state') as c_state,
  from_iso8601_timestamp(json_extract_scalar(c_item, '$.statestarttimestamp')) as c_ts,
  
  json_extract_scalar(p_item, '$.contactid') as p_contactid,
  json_extract_scalar(p_item, '$.channel') as p_channel,
  json_extract_scalar(p_item, '$.initiationmethod') as p_direction,
  json_extract_scalar(p_item, '$.queue.name') as p_queue,
  json_extract_scalar(p_item, '$.state') as p_state,
  from_iso8601_timestamp(json_extract_scalar(p_item, '$.statestarttimestamp')) as p_ts
FROM src2
)
SELECT 
  username,
  c_channel as channel,
  c_direction as direction,
  p_state as prev_state,
  c_state as current_state,
  c_ts as current_ts,
  c_contactid as id
FROM dataset
WHERE c_contactid = p_contactid
ORDER BY id DESC, current_ts ASC

The query output looks similar to the following:

6. Analyze the data with Amazon Redshift Spectrum

With Amazon Redshift Spectrum, you can query data directly in S3 using your existing Amazon Redshift data warehouse cluster. Because the data is already in Parquet format, Redshift Spectrum gets the same great benefits that Athena does.

Here is a simple query to show querying the same data from Amazon Redshift. Note that to do this, you need to first create an external schema in Amazon Redshift that points to the AWS Glue Data Catalog.

SELECT 
  eventtype,
  json_extract_path_text(currentagentsnapshot,'agentstatus','name') AS current_status,
  json_extract_path_text(currentagentsnapshot, 'configuration','firstname') AS current_firstname,
  json_extract_path_text(currentagentsnapshot, 'configuration','lastname') AS current_lastname,
  json_extract_path_text(
    currentagentsnapshot,
    'configuration','routingprofile','defaultoutboundqueue','name') AS current_outboundqueue,
FROM default_schema.kfhconnectblog

The following shows the query output:

Summary

In this post, I showed you how to use Kinesis Data Firehose to ingest and convert data to columnar file format, enabling real-time analysis using Athena and Amazon Redshift. This great feature enables a level of optimization in both cost and performance that you need when storing and analyzing large amounts of data. This feature is equally important if you are investing in building data lakes on AWS.

 


Additional Reading

If you found this post useful, be sure to check out Analyzing VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight and Work with partitioned data in AWS Glue.


About the Author

Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan cheering his team on and hanging out with his family.