All posts by Chinmayi Narasimhadevara

Introducing enhanced functionality for worker configuration management in Amazon MSK Connect

Post Syndicated from Chinmayi Narasimhadevara original https://aws.amazon.com/blogs/big-data/introducing-enhanced-functionality-for-worker-configuration-management-in-amazon-msk-connect/

Amazon MSK Connect is a fully managed service for Apache Kafka Connect. With a few clicks, MSK Connect allows you to deploy connectors that move data between Apache Kafka and external systems.

MSK Connect now supports the ability to delete MSK Connect worker configurations, tag resources, and manage worker configurations and custom plugins using AWS CloudFormation. Together, these new capabilities make it straightforward to manage your MSK Connect resources and automate deployments through CI/CD pipelines.

MSK Connect makes it effortless to stream data to and from Apache Kafka over a private connection without requiring infrastructure management expertise. With a few clicks, you can deploy connectors like an Amazon S3 sink connector for loading streaming data to Amazon Simple Storage Service (Amazon S3), deploy connectors developed by third parties like Debezium for streaming change logs from databases into Apache Kafka, or deploy your own connector customized for your use case.

MSK Connect integrates external systems or AWS services with Apache Kafka by continuously copying streaming data from a data source into your Apache Kafka cluster, or continuously copying data from your Apache Kafka cluster into a data sink. The connector can also perform lightweight tasks such as transformation, format conversion, or filtering data before delivering the data to a destination. You can use a plugin to create the connecter; these custom plugins are resources that contain the code that defines connector logic.

The primary components of MSK Connect are workers. Each worker is a Java virtual machine (JVM) process that runs the connector logic based on the worker configuration provided. Worker configurations are resources that contain your connector configuration properties that can be reused across multiple connectors. Each worker is comprised of a set of tasks that copy the data in parallel.

Today, we are announcing three new capabilities in MSK Connect:

  • The ability to delete worker configurations
  • Support for resource tags for enabling resource grouping, cost allocation and reporting, and access control with tag-based policies
  • Support in AWS CloudFormation to manage worker configurations and custom plugins

In the following sections, we look at the new functionalities in more detail.

Delete worker configurations

Connectors for integrating Amazon Managed Streaming for Apache Kafka (Amazon MSK) with other AWS and partner services are usually created using a worker configuration (default or custom). These configurations can grow with the creation and deletion of connectors, potentially creating configuration management issues.

You can now use the new delete worker configuration API to delete unused configurations. The service checks that the worker configuration is not in use by any connectors before deleting the configuration. Additionally, you can now use a prefix filter to list worker configurations and custom plugins using the ListWorkerConfigurations and ListCustomPlugins API calls. The prefix filter allows you to list the selective resources with names starting with the prefix so you can perform quick selective deletes.

To test the new delete API, complete the following steps:

  1. On the Amazon MSK console, create a new worker configuration.
  2. Provide a name and optional description.
  3. In the Worker configuration section, enter your configuration code.

MSK Connect Worker Configuration

After you create the configuration, a Delete option is available on the configuration detail page (see the following screenshot) if the configuration is not being used in any connector.

To support this new API, an additional workerConfigurationState has been added, so you can more easily track the state of the worker configuration. This new state will be returned in the API call responses for CreateWorkerConfiguration, DescribeWorkerConfiguration, and ListWorkerConfigurations.

MSK Connect Worker Configuration

  1. Choose Delete to delete the worker configuration.
  2. In the confirmation pop-up, enter the name of the worker configuration, then choose Delete.

Delete MSKC Worker Configuration

If the worker configuration is being used with any connector, the Delete option is disabled, as shown in the following screenshot.

Resource tags

MSK Connect now also has support for resource tags. Tags are key-value metadata that can be associated with AWS service resources. You can add tags to connectors, custom plugins, and worker configurations to organize and find resources used across AWS services. In the following screenshots, our example MSK Connect connector, plugin, and worker configuration have been tagged with the resource tag key project and value demo-tags.

You can now tag your Amazon Elastic Compute Cloud (Amazon EC2) and Amazon S3 resources with the same project name, for example. Then you can use the tag to search for all resources linked to this particular project for cost allocation, reporting, resource grouping, or access control. MSK Connect supports adding tags when creating resources, applying tags to an existing resource, removing tags from a resource, and querying tags associated with a resource.

AWS CloudFormation support

Previously, you were only able to provision an MSK Connect connector with AWS CloudFormation by using an existing worker configuration. With this new feature, you can now perform CREATE, READ, UPDATE, DELETE, and LIST operations on connectors, and create and add new worker configurations using AWS CloudFormation.

The following code is an example of creating a worker configuration:

{
"Type": "AWS::KafkaConnect::WorkerConfiguration"
"Properties":{
"Name": "WorkerConfigurationName",
"Description": "WorkerConfigurationDescription",
"PropertiesFileContent": String,
"Tags": [Tag,…],
}
}

The return values are as follows:

  • ARN of the newly created worker configuration
  • State of the new worker configuration
  • Creation time of new worker configuration
  • Latest revision of the new worker configuration

Conclusion

MSK Connect is a fully managed service that provisions the required resources, monitors the health and delivery state of connectors, maintains the underlying hardware, and auto scales connectors to balance the workloads. In this post, we discussed the new features that were added to MSK Connect, which streamline connector and worker management with the introduction of APIs for deleting worker configurations, tagging MSK Connect resources, and support in AWS CloudFormation to create non-default worker configurations.

These capabilities are available in all AWS Regions where Amazon MSK Connect is available. For a list of Region availability, refer to AWS Services by Region. To learn more about MSK Connect, visit the Amazon MSK Connect Developer Guide.


About the Authors

Chinmayi Narasimhadevara is a is a Solutions Architect focused on Big Data and Analytics at Amazon Web Services. Chinmayi has over 20 years of experience in information technology. She helps AWS customers build advanced, highly scalable and performant solutions.

Harita Pappu is Technical Account Manager based out California. She has over 18 years of experience working in software industry building and scaling applications. She is passionate about new technologies and focused on helping customers achieve cost optimization and operational excellence.

Query your Amazon MSK topics interactively using Amazon Kinesis Data Analytics Studio

Post Syndicated from Chinmayi Narasimhadevara original https://aws.amazon.com/blogs/big-data/query-your-amazon-msk-topics-interactively-using-amazon-kinesis-data-analytics-studio/

Amazon Kinesis Data Analytics Studio makes it easy to analyze streaming data in real time and build stream processing applications powered by Apache Flink using standard SQL, Python, and Scala. With a few clicks on the AWS Management Console, you can launch a serverless notebook to query data streams and get results in seconds. Kinesis Data Analytics reduces the complexity of building and managing Apache Flink applications. Apache Flink is an open-source framework and engine for processing data streams. It’s highly available and scalable, delivering high throughput and low latency for stream processing applications.

If you’re running Apache Flink workloads, you may experience the non-trivial challenge of developing your distributed stream processing applications without having true visibility into the steps your application performs for data processing. Kinesis Data Analytics Studio combines the ease of use of Apache Zeppelin notebooks with the power of the Apache Flink processing engine to provide advanced streaming analytics capabilities in a fully managed offering. This accelerates developing and running stream processing applications that continuously generate real-time insights.

In this post, we introduce you to Kinesis Data Analytics Studio and how to get started querying data interactively from an Amazon Managed Streaming for Kafka (Amazon MSK) cluster using SQL, Python, and Scala. We also demonstrate how to query data across different topics using Kinesis Data Analytics Studio. Kinesis Data Analytics Studio is also compatible with Amazon Kinesis Data Streams, Amazon Simple Storage Service (Amazon S3), and a variety of other data sources supported by Apache Flink.

Prerequisites

To get started, you must have the following prerequisites:

  • An MSK cluster
  • A data generator for populating data into the MSK cluster

To follow this guide and interact with your streaming data, you need a data stream with data flowing through.

Create and set up a Kafka cluster

You can create your Kafka cluster either using the Amazon MSK console or the following AWS Command Line Interface (AWS CLI) command. For console instructions, see Getting Started Using Amazon MSK and creating Studio notebook with MSK

You can either create topics and messages or use existing topics in the MSK cluster.

For this post, we have two topics in the MSK cluster, impressions and clicks, and they have the following fields in JSON format:

  • impressionsbid_id, campaign_id, country_code, creative_details, i_timestamp
  • clickscorrelation_id, tracker, c_timestamp

The correlation_id is the click correlation ID for a bid_id, so the field has common values across topics that we use for the join.

For the data in the MSK topic, we use the Amazon MSK Data Generator. Refer to the GitHub repo for setup and usage details. (We will be using the adtech.json sample for this blog)

The following are sample JSON records generated for the impressions topic:

{
   "country_code": "KN",
   "creative_details": "orchid",
   "i_timestamp": "Sat Jul 10 05:34:56 GMT 2021",
   "campaign_id": "1443403873",
   "bid_id": "0868262269"
}
{
   "country_code": "BO",
   "creative_details": "mint green",
   "i_timestamp": "Sat Jul 10 05:34:56 GMT 2021",
   "campaign_id": "1788762118",
   "bid_id": "1025543335"
}

The following are sample JSON records generated for the clicks topic:

{
   "c_timestamp": "Sat Jul 10 05:34:55 GMT 2021",
   "correlation_id": "0868262269",
   "tracker": "8q4rcfkbjnmicgo4rbw48xajokcm4xhcft7025ea1mt0htrfcvsgl1rusg8e8ez30p7orsmjx76vtrha2fi9qb3iaw8htd9uri9jauz64zdq8ldz7b0o8vzlkxs640hnwxgikpfvy5nno15c9etgrh79niku8hhtnxg94n03f2zci5ztv05jixu1r3p5yeehgm9kfd7szle9kikgo2xy5mlx09mmtlo9ndwqdznwjyj3yk02ufcwui1yvzveqfn"
}
{
   "c_timestamp": "Sat Jul 10 05:35:01 GMT 2021",
   "correlation_id": "0868262269",
   "tracker": "gfwq09yk0jwirg9mw60rrpu88h98tkd9xr645jsdoo7dwu24f8usha14uimtsfltvjmhl4i5rq24lz0aucqn6ji4da4xbo6db7lfezus7twhkw238dqw0pzdt98rn5lk8vf4tk6smkyyq38rhjaeh2ezsmlcg4v7im39u7knj10ofiint4fny0xcgqwta0uwq426oc21b1t8m446tmc6fyy7ops80xonzbzfc4a1xjd4x56x81uyg80dxyu2g7v"
}

Create a Kinesis Data Analytics Studio notebook

You can start interacting with your data stream by following these simple steps:

  1. On the Amazon MSK console, choose Process data in real time.
  2. Choose Apache Flink – Studio Notebook.
  3. Enter the name of your Kinesis Data Analytics Studio notebook and allow the notebook to create an AWS Identity and Access Management (IAM) role.

You can create a custom role for specific use cases on the IAM console.

  1. Choose an AWS Glue database to store the metadata around your sources and destinations, which the notebook uses.
  2. Choose Create Studio notebook.

We keep the default settings for the application and can scale up as needed.

  1. After you create the application, choose Start to start the Apache Flink application.
  2. When it’s complete (after a few minutes), choose Open in Apache Zeppelin.

To connect to an MSK cluster, you must specify the same VPC, subnets, and security groups for the Kinesis Data Analytics Studio notebook as were used to create the MSK cluster. If you chose Process data in real time during your setup, this is already set for you.

The Studio notebook is created with an IAM role for the notebook that grants the necessary access for the AWS Glue Data Catalog and tables.

Example applications

Apache Zeppelin supports the Apache Flink interpreter and allows for the use of Apache Flink directly within Zeppelin for interactive data analysis. Within the Flink interpreter, three languages are supported as of this writing: Scala, Python (PyFlink), and SQL. The notebook requires a specification to one of these languages at the top of each paragraph in order to interpret the language properly:

%flink          - Scala environment 
%flink.pyflink  - Python Environment
%flink.ipyflink - ipython Environment
%flink.ssql     - Streaming SQL Environment
%flink.bsql     - Batch SQL Environment 

There are several other predefined variables per interpreter, such as the senv variable in Scala for a StreamExecutionEnvironment, or st_env in Python for the same. You can review the full list of these entry point variables.

In this section, we show the same example code in all three languages to highlight the flexibility Zeppelin affords you for development.

SQL

We use the %flink.ssql(type=update) header to signify to the notebook that this paragraph will be interpreted as Flink SQL. We create two tables from the Kafka topics:

  • impressions – With bid_id, campaign_id, creative_details, country_code, and i_timestamp columns providing details of impressions in the system
  • clicks – With correlation_id, tracker, and c_timestamp providing details of the clicks for an impression.

The tables use the Kafka connector to read from a Kafka topic called impressions and clicks in the us-east-1 Region from the latest offset.

As soon as this statement runs within a Zeppelin notebook, AWS Glue Data Catalog tables are created according to the declaration specified in the create statement, and the tables are available immediately for queries from the MSK cluster.

You don’t need to complete this step if your AWS Glue Data Catalog already contains the tables.

%flink.ssql(type=update)
CREATE TABLE impressions (
bid_id VARCHAR,
creative_details VARCHAR(10),
campaign_id VARCHAR,
country_code VARCHAR(5),
i_timestamp VARCHAR,
serve_time as TO_TIMESTAMP (`i_timestamp`, 'EEE MMM dd HH:mm:ss z yyyy'),
 WATERMARK FOR serve_time AS serve_time -INTERVAL '5' SECOND
)
PARTITIONED BY (bid_id)
WITH (
'connector'= 'kafka',
'topic' = 'impressions',
'properties.bootstrap.servers' = '<bootstrap servers shown in the MSK client
info dialog>',
'format' = 'json',
'properties.group.id' = 'testGroup1',
'scan.startup.mode'= 'earliest-offset',
'json.timestamp-format.standard'= 'ISO-8601'
);

CREATE TABLE clicks (
correlation_id VARCHAR,
tracker VARCHAR(100),
c_timestamp VARCHAR,
click_time as TO_TIMESTAMP (`c_timestamp`, 'EEE MMM dd HH:mm:ss z yyyy'),
 WATERMARK FOR click_time AS click_time -INTERVAL '5' SECOND
)
PARTITIONED BY (correlation_id)
WITH (
'connector'= 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = '<bootstrap servers shown in the MSK client info dialog>',
'format' = 'json',
'properties.group.id' = 'testGroup1',
'scan.startup.mode'= 'earliest-offset',
'json.timestamp-format.standard'= 'ISO-8601'
);

The following screenshot is the AWS Glue Data Catalog view, which shows the tables that represent MSK topics.

In the preceding tables, WATERMARK FOR serve_time AS serve_time - INTERVAL '5' SECOND means that we can tolerate out-of-order delivery of events in the timeframe of 5 seconds and still produce correct results.

After you create the tables, run a query that calculates the number of impressions within a tumbling window of 60 seconds broken down by campaign_id and creative_details:

%flink.ssql(type=update)
SELECT 
 campaign_id, 
 creative_details, 
 TUMBLE_ROWTIME(serve_time, INTERVAL '60' SECOND) 
   AS window_end, COUNT(*) AS c
FROM impressions
GROUP BY 
  TUMBLE(serve_time, INTERVAL '60' SECOND), 
  campaign_id, 
  creative_details
ORDER BY window_end, c DESC;

The results from this query appear as soon as results are available.

Additionally, we want to see the clickthrough rate of the impressions:

SELECT 
  bid_id, 
  campaign_id,
  country_code, 
  creative_details, 
  CAST(serve_time AS TIMESTAMP) AS serveTime, 
  tracker,
  CAST(click_time AS TIMESTAMP) AS clickTime,
  CASE
     WHEN `click_time` IS NULL THEN FALSE
     WHEN `click_time` IS NOT NULL THEN TRUE
  END AS clicked
FROM  impressions 
LEFT OUTER JOIN clicks 
  ON bid_id = correlation_id AND
  click_time BETWEEN  serve_time AND 
  serve_time + INTERVAL '2' MINUTE ;

This query produces one row for each impression and matches it with a click (if any) that was observed within 2 minutes after serving the ad. This is essentially performing a join operation across the topics to get this information.

You can insert this data back into an existing Kafka topic using the following code:

INSERT INTO clickthroughrate 
SELECT 
  bid_id, 
  campaign_id,
  country_code, 
  creative_details, 
  CAST(serve_time AS TIMESTAMP WITHOUT TIME ZONE) AS serveTime, 
  tracker,
  CAST(click_time AS TIMESTAMP WITHOUT TIME ZONE) AS clickTime,
  CASE
     WHEN `click_time` IS NULL THEN FALSE
     WHEN `click_time` IS NOT NULL THEN TRUE
  END AS clicked
FROM  impressions 
LEFT OUTER JOIN clicks 
  ON bid_id = correlation_id AND
  click_time BETWEEN  serve_time AND 
  serve_time + INTERVAL '2' MINUTE ;

Create the corresponding table for the Kafka topic in the Data Catalog if it doesn’t exist already. After you run the preceding query, you can see data in your Amazon MSK topic (see the following sample below):

1095810839,1911670336,KH,"mint green","2021-06-15 01:08:00","ainhpsm6vxgs4gvyl52v13s173gntd7jyitlq328qmam37rpbs2tj1il049dlyb2vgwx89dbvwezl2vkcynqvlqfql7pxp8blg6807yxy1y54eedwff2nuhrbqhce36j00mbxdh72fpjmztymobq79y1g3xoyr6f09rgwqna1kbejkjw4nfddmm0d56g3mkd8obrrzo81z0ktu934a00b04e9q0h1krapotnon76rk0pmw6gr8c24wydp0b2yls","2021-06-15 01:08:07",true
0946058105,1913684520,GP,magenta,"2021-06-15 01:07:56","7mlkc1qm9ntazr7znfn9msew75xs9tf2af96ys8638l745t2hxwnmekaft735xdcuq4xtynpxr68orw5gmbrhr9zyevhawjwfbvzhlmziao3qs1grsb5rdzysvr5663qg2eqi5p7braruyb6rhyxkf4x3q5djo7e1jd5t91ybop0cxu4zqmwkq7x8l7c4y33kd4gwd4g0jmm1hy1df443gdq5tnj8m1qaymr0q9gatqt7jg61cznql0z6ix8pyr","2021-06-15 01:08:07",true
0920672086,0888784120,CK,silver,"2021-06-15 01:08:03","gqr76xyhu2dmtwpv9k3gxihvmn7rluqblh39gcrfyejt0w8jwwliq24okxkho1zuyxdw9mp4vzwi0nd4s5enhvm2d74eydtqnmf7fm4jsyuhauhh3d32esc8gzpbwkgs8yymlp22ih6kodrpjj2bayh4bjebcoeb42buzb43ii1e0zv19bxb8suwg17ut2mdhj4vmf8g9jl02p2tthe9w3rpv7w9w16d14bstiiviy4wcf86adfpz378a49f36q","2021-06-15 01:08:16",true

This is the CSV data from the preceding query, which shows the ClickThroughRate for the impressions. You can use this mechanism to store data back persistently into Kafka from Flink directly.

Scala

We use the %flink header to signify that this code block will be interpreted via the Scala Flink interpreter, and create a table identical to the one from the SQL example. However, in this example, we use the Scala interpreter’s built-in streaming table environment variable, stenv, to run a SQL DDL statement. If the table already exists in the AWS Glue Data Catalog, this statement issues an error stating that the table already exists.

%flink 
stenv.executeSql("""CREATE TABLE impressions (
  bid_id VARCHAR,
  creative_details VARCHAR(10),
  campaign_id VARCHAR,
  country_code VARCHAR(5),
  i_timestamp VARCHAR,
  serve_time as TO_TIMESTAMP (`i_timestamp`, 'EEE MMM dd HH:mm:ss z yyyy'),
  WATERMARK FOR serve_time AS serve_time -INTERVAL '5' SECOND
  )
  WITH (
  'connector'= 'kafka',
  'topic' = 'impressions',
  'properties.bootstrap.servers' = '< Bootstrap Servers shown in the MSK client info dialog >',
  'format' = 'json',
  'properties.group.id' = 'testGroup1',
  'scan.startup.mode'= 'earliest-offset',
  'json.timestamp-format.standard'= 'ISO-8601'
  )""")

stenv.executeSql("""
 CREATE TABLE clicks (
 correlation_id VARCHAR,
 tracker VARCHAR(100),
 c_timestamp VARCHAR,
 click_time as TO_TIMESTAMP (`c_timestamp`, 'EEE MMM dd HH:mm:ss z yyyy'),
 WATERMARK FOR click_time AS click_time -INTERVAL '5' SECOND
 )
 WITH (
 'connector'= 'kafka',
 'topic' = 'clicks',
 'properties.bootstrap.servers' = '< Bootstrap Servers shown in the MSK client info dialog >',
 'format' = 'json',
 'properties.group.id' = 'testGroup1',
 'scan.startup.mode'= 'earliest-offset',
 'json.timestamp-format.standard'= 'ISO-8601'
 )""")

Performing a tumbling window in the Scala table API first requires the definition of an in-memory reference to the table we created. We use the stenv variable to define this table using the from function and referencing the table name. After this is created, we can create a windowed aggregation over 1 minute of data, serve_time column. See the following code:

%flink
val inputTable: Table = stenv.from("impressions")
val tumblingWindowTable = inputTable.window(Tumble over 1.minute on $"serve_time" as $"oneMinuteWindow")
.groupBy( $"oneMinuteWindow", $"campaign_id",$"creative_details")
.select($"campaign_id", $"creative_details", $"oneMinuteWindow".rowtime as "window_end",$"creative_details".count as "c")

Use the ZeppelinContext to visualize the Scala table aggregation within the notebook:

%flink
z.show(tumblingWindowTable, streamType="update")

The following screenshot shows our results.

Additionally, we want to see the clickthrough rate of the impressions by joining with the clicks:

val left:Table = stenv.from("impressions").select("bid_id,campaign_id,country_code,creative_details,serve_time")
val right:Table = stenv.from("clicks").select("correlation_id,tracker,click_time")
val result:Table = left.leftOuterJoin(right).where($"bid_id" === $"correlation_id" && $"click_time" < ( $"serve_time" + 2.minutes) && $"click_time" > $"serve_time").select($"bid_id", $"campaign_id", $"country_code",$"creative_details",$"tracker",$"serve_time".cast(Types.SQL_TIMESTAMP) as "s_time", $"click_time".cast(Types.SQL_TIMESTAMP) as "c_time" , $"click_time".isNull.?("false","true") as "clicked" )

Use the ZeppelinContext to visualize the Scala table aggregation within the notebook.

z.show(result, streamType="update")

The following screenshot shows our results.

Python

We use the %flink.pyflink header to signify that this code block will be interpreted via the Python Flink interpreter, and create a table identical to the one from the SQL and Scala examples. In this example, we use the Python interpreter’s built-in streaming table environment variable, st_env, to run a SQL DDL statement. If the table already exists in the AWS Glue Data Catalog, this statement issues an error stating that the table already exists.

%flink.pyflink
st_env.execute_sql("""
 CREATE TABLE impressions (
 bid_id VARCHAR,
 creative_details VARCHAR(10),
 campaign_id VARCHAR,
 country_code VARCHAR(5),
 i_timestamp VARCHAR,
 serve_time as TO_TIMESTAMP (`i_timestamp`, 'EEE MMM dd HH:mm:ss z yyyy'),
 WATERMARK FOR serve_time AS serve_time -INTERVAL '5' SECOND
 )
 WITH (
 'connector'= 'kafka',
 'topic' = 'impressions',
 'properties.bootstrap.servers' = '< Bootstrap Servers shown in the MSK client info dialog >',
 'format' = 'json',
 'properties.group.id' = 'testGroup1',
 'scan.startup.mode'= 'earliest-offset',
 'json.timestamp-format.standard'= 'ISO-8601'
 )""")
 
st_env.execute_sql("""
 CREATE TABLE clicks (
 correlation_id VARCHAR,
 tracker VARCHAR(100),
 c_timestamp VARCHAR,
 click_time as TO_TIMESTAMP (`c_timestamp`, 'EEE MMM dd HH:mm:ss z yyyy'),
 WATERMARK FOR click_time AS click_time -INTERVAL '5' SECOND
 )
 WITH (
 'connector'= 'kafka',
 'topic' = 'clicks',
 'properties.bootstrap.servers' = '< Bootstrap Servers shown in the MSK client info dialog >',
 'format' = 'json',
 'properties.group.id' = 'testGroup1',
 'scan.startup.mode'= 'earliest-offset',
 'json.timestamp-format.standard'= 'ISO-8601'
 )""")

Performing a sliding (hopping) window in the Python table API first requires the definition of an in-memory reference to the table we created. We use the st_env variable to define this table using the from_path function and referencing the table name. After this is created, we can create a windowed aggregation over 1 minute of data, emitting results every 5 seconds according to the event_time column. See the following code:

%flink.pyflink

input_table = st_env.from_path("impressions")
tumbling_window_table =(input_table.window(Tumble.over("1.minute").on("serve_time").alias("one_minute_window"))
.group_by( "one_minute_window, campaign_id, creative_details")
.select("campaign_id, creative_details, one_minute_window.end as window_end, creative_details.count as c"))

Use the ZeppelinContext to visualize the Python table aggregation within the notebook:

%flink.pyflink

z.show(tumbling_window_table, stream_type="update")

The following screenshot shows our results.

Additionally, we want to see the clickthrough rate of the impressions by joining with the clicks:

impressions = st_env.from_path("impressions").select("bid_id,campaign_id,country_code,creative_details,serve_time")
clicks = st_env.from_path("clicks").select("correlation_id,tracker,click_time")
results = impressions.left_outer_join(clicks).where("bid_id == correlation_id && click_time < (serve_time + 2.minutes) && click_time > serve_time").select("bid_id, campaign_id, country_code, creative_details, tracker, serve_time.cast(STRING) as s_time, click_time.cast(STRING) as c_time, (click_time.isNull).?('false','true') as clicked")

Scaling

A Studio notebook consists of one or more tasks. You can split a Studio notebook task into several parallel instances to run, where each parallel instance processes a subset of the task’s data. The number of parallel instances of a task is called its parallelism, and adjusting that helps run your tasks more efficiently.

On creation, Studio notebooks are given four parallel Kinesis Processing Units (KPUs), which make up the application parallelism. To increase that parallelism, navigate to the Kinesis Data Analytics console, choose your application name, and choose the Configuration tab.

From this page, in the Scaling section, choose Edit and modify the Parallelism entry. We don’t recommend increasing the Parallelism Per KPU setting higher than 1 unless your application is I/O bound.

Choose Save changes to increase or decrease your application’s parallelism.

Clean up

You may want to clean up the demo environment when you are done, To do so, stop the Studio notebook and delete the resources created for the Data Generator and the Amazon MSK cluster ( if you created a new cluster).

Summary

Kinesis Data Analytics Studio makes developing stream processing applications using Apache Flink much faster, with rich visualizations, a scalable and user-friendly interface to develop pipelines, and the flexibility of language choice to make any streaming workload performant and powerful. You can run paragraphs from within the notebook or promote your Studio notebook to a Kinesis Data Analytics for Apache Flink application with a durable state, as shown in the SQL example in this post.

For more information, see the following resources:


About the Author

Chinmayi Narasimhadevara is a Solutions Architect focused on Big Data and Analytics at Amazon Web Services. Chinmayi has over 15 years of experience in information technology. She helps AWS customers build advanced, highly scalable and performant solutions