Tag Archives: Amazon Managed Streaming for Apache Kafka

Vortexa delivers real-time insights on Amazon MSK with Lenses.io

Post Syndicated from Andrew Stevenson original https://aws.amazon.com/blogs/big-data/vortexa-delivers-real-time-insights-on-amazon-msk-with-lenses-io/

This is a guest post by Lenses.io. In their own words, “Lenses.io is a group of engineers, designers, developers, data analysts, tinkerers, writers, and open-source contributors. We specialize in the democratizing of data technologies. That’s why we developed Lenses.io (Lenses-eye-oh) to help enterprises simplify their operations by making data work for teams, not the other way around.”

This post discusses how Vortexa harnesses the power of Apache Kafka to improve real-time data accuracy and accelerate time-to-market by using a combination of Lenses.io for greater observability and Amazon Managed Streaming for Apache Kafka (Amazon MSK) to create clusters on demand.

Vortexa provides a real-time analytics platform for seaborne oil supply, which enables energy market players to make smarter trading and shipping decisions. Their service needs to process tens of millions of data points per day, and instant insights on the slightest changes in ship movements are crucial to their customers.

That’s why Vortexa’s data analytics team relies on powerful data streaming technologies such as Apache Kafka to make their real-time applications work.

However, as they scaled their self-managed Apache Kafka environment, they saw several challenges emerge:

  • Debugging incidents took days. Investigation of incidents involved a series of in-house developed scripts and open-source tools.
  • The smallest mistake could bring down a whole cluster. The team ran the risk of downtime due to difficulties in manual configuration and a nebulous command line.
  • Slowdowns affected production. This meant more time spent on everyday tasks, such as assessing the data quality, and less time shipping new features to market.
  • Specialist skills were difficult to come by. Having a specialist in-house for every data platform permutation—like someone who understood how to both build and operate Apache Kafka—wasn’t wise or sustainable.

And so Fridays for Vortexa became “Kafka Fridays.” The fragilities and complexities in an all-important Apache Kafka infrastructure cost the team their Fridays and often their weekends, and also ate into their release schedules.

Self-healing and observability save the day

To address this, the Vortexa team evaluated several managed solutions, and found that Amazon MSK best met their requirements and budget. Amazon MSK continuously monitors cluster performance, is self-healing, and automatically replaces unhealthy nodes without downtime.

Already users of multiple other AWS services, Vortexa benefited from Amazon MSK integrating with other AWS services such as Amazon Virtual Private Cloud (Amazon VPC), AWS Key Management Service (AWS KMS), and AWS Identity and Access Management (IAM). Amazon MSK can also integrate with Amazon CloudWatch, AWS CloudTrail, and Lenses.io for more effective, integrated monitoring and logging of issues.

However, Apache Kafka was still a black box when it came to understanding their deployed flows. Vortexa uses Lenses.io as their data operations portal, which gives them Apache Kafka monitoring, data observability and governance control for their real-time applications that run across their Amazon MSK and Kubernetes platform. With a clear UI, they can troubleshoot a problem on a streaming application within a few minutes compared to hours taken before, which led to an improvement of at least 30-fold.

“Lenses.io gives us a complete vision and extra management capabilities for our streaming applications in Apache Kafka clusters,” says Nguyen Lam Phuc, Senior DevOps Engineer at Vortexa. “Testing as well as operations suddenly became painless and felt much more intuitive than ever before.”

Previously, the team also needed to manually run commands, add connectors involving trial and error, and implement complex deployment procedures—and cross their fingers that this would work.

“In our four-year history, we’ve had at least five different ways of operating, and four different ways of monitoring Apache Kafka, says Jakub Korzeniowski, Head of Data Services at Vortexa. “The combination of Lenses.io and Amazon MSK has been the only solution that allows us to focus on business logic instead of concentrating the majority of our efforts on just meeting SLAs.”

Now, through a single dashboard, Vortexa can use Lenses.io to view their application topology and expose consumer lag metrics that they can visualize and alert in Lenses.io before forwarding to third-party tools such as Slack and PagerDuty.

Open monitoring with Amazon MSK and Lenses.io

You can monitor the health of the Amazon MSK infrastructure within external tools using the AWS open monitoring framework. Vortexa uses open monitoring with Prometheus to access JMX metrics that Amazon MSK brokers generate. The following diagram illustrates this architecture.

Value in data quality and release velocity for Vortexa

Maksym Schipka, Vortexa’s CTO, explains how Amazon MSK and Lenses.io saved 15% of working hours in Apache Kafka management: “At least twice a week, we would experience issues with Kafka. We were spending so long trying to understand the issue that it was more efficient for us to replace the cluster with a completely new one.

“Amazon MSK and Lenses.io have been pivotal technologies for Vortexa, enabling us to shift significant efforts from maintaining and stabilizing a complex and fragile Kafka infrastructure to focusing more on the quality of analytics and market insights that directly impact the value we deliver to our customers.”

“We can now confidently and frequently update our applications at scale, run complex Kafka streams topologies with ease, and debug applications instead of infrastructure. Our revenue and market share are directly impacted by the speed at which we deliver accurate data to the market, and Amazon MSK and Lenses.io are key enablers of that.”

Fast-tracking to production in minutes, not days

Vortexa can now build new data pipelines that integrate data across their different technologies, including Amazon Elasticsearch Service and Redis, and deploy into production in minutes.

Everyday tasks, such as inspecting messages on a stream with SQL, understanding the profile of a topic, or verifying the replication factor are now easy and performed in under 10 seconds, without the need to have specific in-house Apache Kafka expertise.

“Amazon MSK with Lenses SQL is one step towards making Kafka feel like any other database,” says Romain Guion, Head of Signal Processing and Enrichment at Vortexa. “It allows us to verify the outputs of our models well, which recently resulted in a 90% decrease in AIS data affected by signal noise and spoofing.”

The Vortexa team sees a real-time topological view of their entire data landscape, including their flows to Elasticsearch and Redis and inside their Apache Kafka streams applications.

Conclusion

Using Amazon MSK and Lenses.io helped Vortexa to:

  • Save 15% of working hours on Apache Kafka management
  • Deploy new Amazon MSK clusters into production in minutes, not days
  • Increase Apache Kafka users (both operators and application developers) by 200%
  • Reduce automatic identification system signal noise by 90%, aided by quality assurance done exclusively in Lenses.io

To learn how to get greater visibility into your data streams and streaming platform, visit Lenses.io. For information on how to build and run production applications on Apache Kafka without needing Apache Kafka infrastructure management expertise, visit Amazon Managed Streaming for Apache Kafka.

 


About the author

Andrew Stevenson is the Chief Technology Officer and co-founder of Lenses.io. He leads Lenses.io’s world-class engineering team and technical strategy.

 

 

New – Serverless Streaming ETL with AWS Glue

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-serverless-streaming-etl-with-aws-glue/

When you have applications in production, you want to understand what is happening, and how the applications are being used. To analyze data, a first approach is a batch processing model: a set of data is collected over a period of time, then run through analytics tools. To be able to react quickly, you can use a streaming model, where data is processed as it arrives, a record at a time or in micro-batches of tens, hundreds, or thousands of records.

Managing continuous ingestion pipelines and processing data on-the-fly is quite complex, because it’s an always-on system that needs to be managed, patched, scaled, and generally taken care of. Today, we are making this easier and more cost-effective to implement by extending AWS Glue jobs, based on Apache Spark, to run continuously and consume data from streaming platforms such as Amazon Kinesis Data Streams and Apache Kafka (including the fully-managed Amazon MSK).

In this way, Glue can provision, manage, and scale the infrastructure needed to ingest data to data lakes on Amazon S3, data warehouses such as Amazon Redshift, or other data stores. For example, you can store streaming data in a DynamoDB table for quick lookups, or in Elasticsearch to look for specific patterns. This procedure is usually referred to as extract, transform, load (ETL).

As you process streaming data in a Glue job, you have access to the full capabilities of Spark Structured Streaming to implement data transformations, such as aggregating, partitioning, and formatting as well as joining with other data sets to enrich or cleanse the data for easier analysis. For example, you can access an external system to identify fraud in real-time, or use machine learning algorithms to classify data, or detect anomalies and outliers.

Processing Streaming Data with AWS Glue
To try this new feature, I want to collect data from IoT sensors and store all data points in an S3 data lake. I am using a Raspberry Pi with a Sense HAT to collect temperature, humidity, barometric pressure, and its position in space in real-time (using the integrated gyroscope, accelerometer, and magnetometer). Here’s an architectural view of what I am building:

First, I register the device with AWS IoT Core, and run the following Python code to send, once per second, a JSON message with sensor data to the streaming-data MQTT topic. I have a single device in this setup, with more devices, I would use a subtopic per device, for example streaming-data/{client_id}.

import time
import datetime
import json
from sense_hat import SenseHat
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder

sense = SenseHat()

topic = "streaming-data"
client_id = "raspberrypi"

# Callback when connection is accidentally lost.


def on_connection_interrupted(connection, error, **kwargs):
    print("Connection interrupted. error: {}".format(error))


# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print("Connection resumed. return_code: {} session_present: {}".format(
        return_code, session_present))

    if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
        print("Session did not persist. Resubscribing to existing topics...")
        resubscribe_future, _ = connection.resubscribe_existing_topics()

        # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
        # evaluate result with a callback instead.
        resubscribe_future.add_done_callback(on_resubscribe_complete)


def on_resubscribe_complete(resubscribe_future):
    resubscribe_results = resubscribe_future.result()
    print("Resubscribe results: {}".format(resubscribe_results))

    for topic, qos in resubscribe_results['topics']:
        if qos is None:
            sys.exit("Server rejected resubscribe to topic: {}".format(topic))


# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, **kwargs):
    print("Received message from topic '{}': {}".format(topic, payload))


def collect_and_send_data():
    publish_count = 0
    while(True):

        humidity = sense.get_humidity()
        print("Humidity: %s %%rH" % humidity)

        temp = sense.get_temperature()
        print("Temperature: %s C" % temp)

        pressure = sense.get_pressure()
        print("Pressure: %s Millibars" % pressure)

        orientation = sense.get_orientation_degrees()
        print("p: {pitch}, r: {roll}, y: {yaw}".format(**orientation))

        timestamp = datetime.datetime.fromtimestamp(
            time.time()).strftime('%Y-%m-%d %H:%M:%S')

        message = {
            "client_id": client_id,
            "timestamp": timestamp,
            "humidity": humidity,
            "temperature": temp,
            "pressure": pressure,
            "pitch": orientation['pitch'],
            "roll": orientation['roll'],
            "yaw": orientation['yaw'],
            "count": publish_count
        }
        print("Publishing message to topic '{}': {}".format(topic, message))

        mqtt_connection.publish(
            topic=topic,
            payload=json.dumps(message),
            qos=mqtt.QoS.AT_LEAST_ONCE)
        time.sleep(1)
        publish_count += 1


if __name__ == '__main__':
    # Spin up resources
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint="a1b2c3d4e5f6g7-ats.iot.us-east-1.amazonaws.com",
        cert_filepath="rapberrypi.cert.pem",
        pri_key_filepath="rapberrypi.private.key",
        client_bootstrap=client_bootstrap,
        ca_filepath="root-CA.crt",
        on_connection_interrupted=on_connection_interrupted,
        on_connection_resumed=on_connection_resumed,
        client_id=client_id,
        clean_session=False,
        keep_alive_secs=6)

    connect_future = mqtt_connection.connect()

    # Future.result() waits until a result is available
    connect_future.result()
    print("Connected!")

    # Subscribe
    print("Subscribing to topic '{}'...".format(topic))
    subscribe_future, packet_id = mqtt_connection.subscribe(
        topic=topic,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_message_received)

    subscribe_result = subscribe_future.result()
    print("Subscribed with {}".format(str(subscribe_result['qos'])))

    collect_and_send_data()

This is an example of the JSON messages sent by the device:

{
    "client_id": "raspberrypi",
    "timestamp": "2020-04-16 11:33:23",
    "humidity": 39.35261535644531,
    "temperature": 30.10732078552246,
    "pressure": 1020.447509765625,
    "pitch": 4.044007304723748,
    "roll": 7.533848064912158,
    "yaw": 77.01560798660883,
    "count": 104
}

In the Kinesis console, I create the my-data-stream data stream (1 shard is more than enough for my workload). Back in the AWS IoT console, I create an IoT rule to send all data from the MQTT topic to this Kinesis data stream.

Now that all sensor data is sent to Kinesis, I can leverage the new Glue integration to process data as it arrives. In the Glue console, I manually add a table in the Glue Data Catalog. I select Kinesis as the type of source, and enter my stream name and the endpoint of the Kinesis Data Streams service. Note that for Kafka streams, before creating the table, you need to create a Glue connection.

I select JSON as data format, and define the schema for the streaming data. If I don’t specify a column here, it will be ignored when processing the stream.

After that, I confirm the final recap step, and create the my_streaming_data table. We are working to add schema inference to streaming ETL jobs. With that, specifying the full schema up front won’t be necessary. Stay tuned.

To process the streaming data, I create a Glue job. For the IAM role, I create a new one attaching the AWSGlueServiceRole and AmazonKinesisReadOnlyAccess managed policies. Depending on your use case and the set up of your AWS accounts, you may want to use a role providing more fine-grained access.

For the data source, I select the table I just created, receiving data from the Kinesis stream.

To get a script generated by Glue, I select the Change schema transform type. As target, I create a new table in the Glue Data Catalog, using an efficient format like Apache Parquet. The Parquet files generated by this job are going to be stored in an S3 bucket whose name starts with aws-glue- (including the final hyphen). By following the naming convention for resources specified in the AWSGlueServiceRole policy, this job has the required permissions to access those resources.

I leave the default mapping that keeps in output all the columns in the source stream. In this way, I can ingest all the records using the proposed script, without having to write a single line of code.

I quickly review the proposed script and save. Each record is processed as a DynamicFrame, and I can apply any of the Glue PySpark Transforms or any transforms supported by Spark Structured Streaming. By default with this configuration, only ApplyMapping is used.

I start the job, and after a few minutes I see the Parquet files containing the output of the job appearing in the output S3 bucket. They are partitioned by ingest date (year, month, day, and hour).

To populate the Glue Data Catalog with tables based on the content of the S3 bucket, I add and run a crawler. In the crawler configuration, I exclude the checkpoint folder used by Glue to keep track of the data that has been processed. After less than a minute, a new table has been added.

In the Amazon Athena console, I refresh database and tables, and select to preview the output_my_data containing ingest data from this year. In this way, I see the first ten records in the table, and get a confirmation that my setup is working!

Now, as data is being ingested, I can run more complex queries. For example, I can get the minimum and maximum temperature, collected from the device sensors, and the overall number of records stored in the Parquet files.

Looking at the results, I see more than 8,000 records have been processed, with a maximum temperature of 31 degrees Celsius (about 88 degrees Fahrenheit). Actually, it was never really this hot. Temperature is measured by these sensors very close to the device, and is growing as the device is warming up with usage.

I am using a single device in this set up, but the solution implemented here can easily scale up with the number of data sources.

Available Now
Support for streaming sources is available in all regions where Glue is offered, as described in the AWS Region table. For more information, please have a look at the documentation.

Managing a serverless ETL pipeline with Glue makes it easier and more cost-effective to set up and manage streaming ingestion processes, reducing implementation efforts so you can focus on the business outcomes of analytics. You can set up a whole ingestion pipeline without writing code, as I did in this walkthrough, or customize the proposed script based on your needs.

Let me know what are you going to use this new feature for!

Danilo

Govern how your clients interact with Apache Kafka using API Gateway

Post Syndicated from Prasad Alle original https://aws.amazon.com/blogs/big-data/govern-how-your-clients-interact-with-apache-kafka-using-api-gateway/

At some point, you may ask yourself:

  • How can I implement IAM authentication or authorization to Amazon Managed Streaming for Apache Kafka (MSK)?
  • How can I protect my Apache Kafka cluster from traffic spikes based on specific scenarios without setting quotas on the cluster?
  • How can I validate requests adhere to a JSON Schema?
  • How can I make sure parameters are included in the URI, query string, and headers?
  • How can Amazon MSK ingest messages lightweight clients without using an agent or the native Apache Kafka protocol?

These tasks are achievable using custom proxy servers or gateways, but these options can be difficult to implement and manage. On the other hand, API Gateway has these features and is a fully managed AWS service.

In this blog post we will show you how Amazon API Gateway can answer these questions as a component between your Amazon MSK cluster and your clients.

Amazon MSK is a fully managed service for Apache Kafka that makes it easy to provision Kafka clusters with just a few clicks without the need to provision servers, manage storage, or configure Apache Zookeeper manually. Apache Kafka is an open-source platform for building real-time streaming data pipelines and applications.

Some use cases include ingesting messages from lightweight IoT devices that don’t have support for native Kafka protocol and orchestrating your streaming services with other backend services including third-party APIs.

This pattern also comes with the following trade-offs:

  • Cost and complexity due to another service to run and maintain.
  • Performance overhead because it adds extra processing to construct and make HTTP requests. Additionally, REST Proxy needs to parse requests, transform data between formats both for produce, and consume requests.

When you implement this architecture in a production environment, you should consider these points with your business use case and SLA needs.

Solution overview

To implement the solution, complete the following steps:

  1. Create an MSK cluster, Kafka client, and Kafka REST Proxy
  2. Create a Kafka topic and configure the REST Proxy on a Kafka client machine
  3. Create an API with REST Proxy integration via API Gateway
  4. Test the end-to-end processes by producing and consuming messages to Amazon MSK

The following diagram illustrates the solution architecture.

 

Within this architecture, you create an MSK cluster and set up an Amazon EC2 instance with the REST Proxy and Kafka client. You then expose the REST Proxy through Amazon API Gateway and also test the solution by producing messages to Amazon MSK using Postman.

For the production implementation, make sure to set up the REST Proxy behind load balancer with an Auto Scaling group.

Prerequisites

Before you get started, you must have the following prerequisites:

  • An AWS account that provides access to AWS services
  • An IAM user with an access key and secret access key to configure the AWS CLI
  • An Amazon EC2 keypair

Creating an MSK cluster, Kafka client, and REST Proxy

AWS CloudFormation provisions all the required resources, including VPC, subnets, security groups, Amazon MSK cluster, Kafka client, and Kafka REST Proxy. To create these resources, complete the following steps:

  1. Launch in the us-east-1 or us-west-2It takes approximately 15 to 20 minutes to complete.
  2. From the AWS CloudFormation console, choose AmzonMSKAPIBlog.
  3. Under Outputs, get the MSKClusterARN, KafkaClientEC2InstancePublicDNS, and MSKSecurityGroupID details.
  4. Get the ZooKeeperConnectionString and other information about your cluster by entering the following code (provide your Region, cluster ARN, and AWS named profile):
    $ aws kafka describe-cluster --region <Replace_With_us-east-1_or_us-west-2> --cluster-arn <Replace_With_Your_cluster-arn> --profile <Replace_With_Your_Profile>

    The following code example shows one of the lines in the output of this command:

    {
    ….
    ….
    "ZookeeperConnectString": "z-2.XXXXXX.us-east-1.amazonaws.com:2181,z-3.XXXXXX.us-east-1.amazonaws.com:2181,z-1.XXXXXX.us-east-1.amazonaws.com:2181"
    }

  5. Get the BootstrapBrokerString by entering the following code (provide your Region, cluster ARN, and AWS named profile):

    $ aws kafka get-bootstrap-brokers --region <Replace_With_us-east-1_or_us-west-2> --cluster-arn "<Replace_With_us-east-1_or_us-west-2>" --profile <Replace_With_Your_Profile>

    The following code example shows the output of this command:

    {
    "BootstrapBrokerString": "b-2.XXXXXXXXXXXX.us-east-1.amazonaws.com:9092,b-1.XXXXXXXXXXXX.amazonaws.com:9092,b-3.XXXXXXXXXXXX.us-east-1.amazonaws.com:9092"
    }

Creating a Kafka topic and configuring a Kafka REST Proxy

To create a Kafka topic and configure a Kafka REST Proxy on a Kafka client machine, complete the following steps:

  1. SSH into your Kafka client Amazon EC2 instance. See the following code:
    ssh -i <Replace_With_Your_pemfile> [email protected]<Replace_With_Your_KafkaClientDNS>

  2. Go to the bin folder (kafka/kafka_2.12-2.2.1/bin/) of the Apache Kafka installation on the client machine.
  3. Create a topic by entering the following code (provide the value you obtained for ZookeeperConnectString in the previous step):
    ./kafka-topics.sh --create --zookeeper <Replace_With_Your_ZookeeperConnectString> --replication-factor 3 --partitions 1 --topic amazonmskapigwblog

    If the command is successful, you see the following message: Created topic amazonmskapigwblog.

  4. To connect the Kafka REST server to the Amazon MSK cluster, modify kafka-rest.properties in the directory (/home/ec2-user/confluent-5.3.1/etc/kafka-rest/) to point to your Amazon MSK’s ZookeeperConnectString and BootstrapserversConnectString information. See the following code:
    sudo vi /home/ec2-user/confluent-5.3.1/etc/kafka-rest/kafka-rest.properties
    
    	zookeeper.connect=<Replace_With_Your_ZookeeperConnectString>
    bootstrap.servers=<Replace_With_Your_BootstrapserversConnectString> 

    As an additional, optional step, you can create an SSL for securing communication between REST clients and the REST Proxy (HTTPS). If SSL is not required, you can skip steps 5 and 6.

  5. Generate the server and client certificates. For more information, see Creating SLL Keys and Certificates on the Confluent website.
  6. Add the necessary property configurations to the kafka-rest.properties configuration file. See the following code example:
    listeners=http://0.0.0.0:8082,https://0.0.0.0:8085
    ssl.truststore.location=<Replace_With_Your_tuststore.jks>
    ssl.truststore.password=<Replace_With_Your_tuststorepassword>
    ssl.keystore.location=<Replace_With_Your_keystore.jks>
    ssl.keystore.password=<Replace_With_Your_keystorepassword>
    ssl.key.password=<Replace_With_Your_sslkeypassword>

    For more detailed instructions, see Encryption and Authentication with SSL on the Confluent website.

You have now created a Kafka topic and configured Kafka REST Proxy to connect to your Amazon MSK cluster.

Creating an API with Kafka REST Proxy integration

To create an API with Kafka REST Proxy integration via API Gateway, complete the following steps:

  1. On the API Gateway console, choose Create API.
  2. For API type, choose REST API.
  3. Choose Build.
  4. Choose New API.
  5. For API Name, enter a name (for example, amazonmsk-restapi).
  6. As an optional step, for Description, enter a brief description.
  7. Choose Create API.The next step is to create a child resource.
  8. Under Resources, choose a parent resource item.
  9. Under Actions, choose Create Resource.The New Child Resource pane opens.
  10. Select Configure as proxy resource.
  11. For Resource Name, enter proxy.
  12. For Resource Path, enter /{proxy+}.
  13. Select Enable API Gateway CORS.
  14. Choose Create Resource.After you create the resource, the Create Method window opens.
  15. For Integration type, select HTTP Proxy.
  16. For Endpoint URL, enter an HTTP backend resource URL (your Kafka Clien Amazont EC2 instance PublicDNS; for example, http://KafkaClientEC2InstancePublicDNS:8082/{proxy} or https://KafkaClientEC2InstancePublicDNS:8085/{proxy}).
  17. Use the default settings for the remaining fields.
  18. Choose Save.
  19. For SSL, for Endpoint URL, use the HTTPS endpoint.In the API you just created, the API’s proxy resource path of {proxy+} becomes the placeholder of any of the backend endpoints under http://YourKafkaClientPublicIP:8082/.
  20. Choose the API you just created.
  21. Under Actions, choose Deploy API.
  22. For Deployment stage, choose New Stage.
  23. For Stage name, enter the stage name (for example, dev, test, or prod).
  24. Choose Deploy.
  25. Record the Invoke URL after you have deployed the API.

Your external Kafka REST Proxy, which was exposed through API Gateway, now looks like https://YourAPIGWInvoleURL/dev/topics/amazonmskapigwblog. You use this URL in the next step.

Testing the end-to-end processes

To test the end-to-end processes by producing and consuming messages to Amazon MSK. Complete the following steps:

  1. SSH into the Kafka Client Amazon EC2 instance. See the following code:
    ssh -i "xxxxx.pem" [email protected]

  2. Go to the confluent-5.3.1/bin directory and start the kafka-rest service. See the following code:
    ./kafka-rest-start /home/ec2-user/confluent-5.3.1/etc/kafka-rest/kafka-rest.properties

    If the service already started, you can stop it with the following code:

    ./kafka-rest-stop /home/ec2-user/confluent-5.3.1/etc/kafka-rest/kafka-rest.properties

  3. Open another terminal window.
  4. In the kafka/kafka_2.12-2.2.1/bin directory, start the Kafka console consumer. See the following code:
    ./kafka-console-consumer.sh --bootstrap-server "BootstrapserversConnectString" --topic amazonmskapigwblog --from-beginning 

    You can now produce messages using Postman. Postman is an HTTP client for testing web services.

    Be sure to open TCP ports on the Kafka client security group from the system you are running Postman.

  5. Under Headers, choose the key Content-Type with value application/vnd.kafka.json.v2+json.
  6. Under Body, select raw.
  7. Choose JSON.This post enters the following code:
    {"records":[{"value":{"deviceid": "AppleWatch4","heartrate": "72","timestamp":"2019-10-07 12:46:13"}}]} 

    The following screen shot shows messages coming to the Kafka consumer from the API Gateway Kafka REST endpoint.

Conclusion

This post demonstrated how easy it is to set up REST API endpoints for Amazon MSK with API Gateway. This solution can help you produce and consume messages to Amazon MSK from any IoT device or programming language without depending on native Kafka protocol or clients.

If you have questions or suggestions, please leave your thoughts in the comments.

 


About the Author

Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.

 

 

Francisco Oliveira is a senior big data solutions architect with AWS. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.

Learn about AWS Services & Solutions – September AWS Online Tech Talks

Post Syndicated from Jenny Hang original https://aws.amazon.com/blogs/aws/learn-about-aws-services-solutions-september-aws-online-tech-talks/

Learn about AWS Services & Solutions – September AWS Online Tech Talks

AWS Tech Talks

Join us this September to learn about AWS services and solutions. The AWS Online Tech Talks are live, online presentations that cover a broad range of topics at varying technical levels. These tech talks, led by AWS solutions architects and engineers, feature technical deep dives, live demonstrations, customer examples, and Q&A with AWS experts. Register Now!

Note – All sessions are free and in Pacific Time.

Tech talks this month:

 

Compute:

September 23, 2019 | 11:00 AM – 12:00 PM PTBuild Your Hybrid Cloud Architecture with AWS – Learn about the extensive range of services AWS offers to help you build a hybrid cloud architecture best suited for your use case.

September 26, 2019 | 1:00 PM – 2:00 PM PTSelf-Hosted WordPress: It’s Easier Than You Think – Learn how you can easily build a fault-tolerant WordPress site using Amazon Lightsail.

October 3, 2019 | 11:00 AM – 12:00 PM PTLower Costs by Right Sizing Your Instance with Amazon EC2 T3 General Purpose Burstable Instances – Get an overview of T3 instances, understand what workloads are ideal for them, and understand how the T3 credit system works so that you can lower your EC2 instance costs today.

 

Containers:

September 26, 2019 | 11:00 AM – 12:00 PM PTDevelop a Web App Using Amazon ECS and AWS Cloud Development Kit (CDK) – Learn how to build your first app using CDK and AWS container services.

 

Data Lakes & Analytics:

September 26, 2019 | 9:00 AM – 10:00 AM PTBest Practices for Provisioning Amazon MSK Clusters and Using Popular Apache Kafka-Compatible Tooling – Learn best practices on running Apache Kafka production workloads at a lower cost on Amazon MSK.

 

Databases:

September 25, 2019 | 1:00 PM – 2:00 PM PTWhat’s New in Amazon DocumentDB (with MongoDB compatibility) – Learn what’s new in Amazon DocumentDB, a fully managed MongoDB compatible database service designed from the ground up to be fast, scalable, and highly available.

October 3, 2019 | 9:00 AM – 10:00 AM PTBest Practices for Enterprise-Class Security, High-Availability, and Scalability with Amazon ElastiCache – Learn about new enterprise-friendly Amazon ElastiCache enhancements like customer managed key and online scaling up or down to make your critical workloads more secure, scalable and available.

 

DevOps:

October 1, 2019 | 9:00 AM – 10:00 AM PT – CI/CD for Containers: A Way Forward for Your DevOps Pipeline – Learn how to build CI/CD pipelines using AWS services to get the most out of the agility afforded by containers.

 

Enterprise & Hybrid:

September 24, 2019 | 1:00 PM – 2:30 PM PT Virtual Workshop: How to Monitor and Manage Your AWS Costs – Learn how to visualize and manage your AWS cost and usage in this virtual hands-on workshop.

October 2, 2019 | 1:00 PM – 2:00 PM PT – Accelerate Cloud Adoption and Reduce Operational Risk with AWS Managed Services – Learn how AMS accelerates your migration to AWS, reduces your operating costs, improves security and compliance, and enables you to focus on your differentiating business priorities.

 

IoT:

September 25, 2019 | 9:00 AM – 10:00 AM PTComplex Monitoring for Industrial with AWS IoT Data Services – Learn how to solve your complex event monitoring challenges with AWS IoT Data Services.

 

Machine Learning:

September 23, 2019 | 9:00 AM – 10:00 AM PTTraining Machine Learning Models Faster – Learn how to train machine learning models quickly and with a single click using Amazon SageMaker.

September 30, 2019 | 11:00 AM – 12:00 PM PTUsing Containers for Deep Learning Workflows – Learn how containers can help address challenges in deploying deep learning environments.

October 3, 2019 | 1:00 PM – 2:30 PM PTVirtual Workshop: Getting Hands-On with Machine Learning and Ready to Race in the AWS DeepRacer League – Join DeClercq Wentzel, Senior Product Manager for AWS DeepRacer, for a presentation on the basics of machine learning and how to build a reinforcement learning model that you can use to join the AWS DeepRacer League.

 

AWS Marketplace:

September 30, 2019 | 9:00 AM – 10:00 AM PTAdvancing Software Procurement in a Containerized World – Learn how to deploy applications faster with third-party container products.

 

Migration:

September 24, 2019 | 11:00 AM – 12:00 PM PTApplication Migrations Using AWS Server Migration Service (SMS) – Learn how to use AWS Server Migration Service (SMS) for automating application migration and scheduling continuous replication, from your on-premises data centers or Microsoft Azure to AWS.

 

Networking & Content Delivery:

September 25, 2019 | 11:00 AM – 12:00 PM PTBuilding Highly Available and Performant Applications using AWS Global Accelerator – Learn how to build highly available and performant architectures for your applications with AWS Global Accelerator, now with source IP preservation.

September 30, 2019 | 1:00 PM – 2:00 PM PTAWS Office Hours: Amazon CloudFront – Just getting started with Amazon CloudFront and [email protected]? Get answers directly from our experts during AWS Office Hours.

 

Robotics:

October 1, 2019 | 11:00 AM – 12:00 PM PTRobots and STEM: AWS RoboMaker and AWS Educate Unite! – Come join members of the AWS RoboMaker and AWS Educate teams as we provide an overview of our education initiatives and walk you through the newly launched RoboMaker Badge.

 

Security, Identity & Compliance:

October 1, 2019 | 1:00 PM – 2:00 PM PTDeep Dive on Running Active Directory on AWS – Learn how to deploy Active Directory on AWS and start migrating your windows workloads.

 

Serverless:

October 2, 2019 | 9:00 AM – 10:00 AM PTDeep Dive on Amazon EventBridge – Learn how to optimize event-driven applications, and use rules and policies to route, transform, and control access to these events that react to data from SaaS apps.

 

Storage:

September 24, 2019 | 9:00 AM – 10:00 AM PTOptimize Your Amazon S3 Data Lake with S3 Storage Classes and Management Tools – Learn how to use the Amazon S3 Storage Classes and management tools to better manage your data lake at scale and to optimize storage costs and resources.

October 2, 2019 | 11:00 AM – 12:00 PM PTThe Great Migration to Cloud Storage: Choosing the Right Storage Solution for Your Workload – Learn more about AWS storage services and identify which service is the right fit for your business.

 

 

Extract Oracle OLTP data in real time with GoldenGate and query from Amazon Athena

Post Syndicated from Sreekanth Krishnavajjala original https://aws.amazon.com/blogs/big-data/extract-oracle-oltp-data-in-real-time-with-goldengate-and-query-from-amazon-athena/

This post describes how you can improve performance and reduce costs by offloading reporting workloads from an online transaction processing (OLTP) database to Amazon Athena and Amazon S3. The architecture described allows you to implement a reporting system and have an understanding of the data that you receive by being able to query it on arrival. In this solution:

  • Oracle GoldenGate generates a new row on the target for every change on the source to create Slowly Changing Dimension Type 2 (SCD Type 2) data.
  • Athena allows you to run ad hoc queries on the SCD Type 2 data.

Principles of a modern reporting solution

Advanced database solutions use a set of principles to help them build cost-effective reporting solutions. Some of these principles are:

  • Separate the reporting activity from the OLTP. This approach provides resource isolation and enables databases to scale for their respective workloads.
  • Use query engines running on top of distributed file systems like Hadoop Distributed File System (HDFS) and cloud object stores, such as Amazon S3. The advent of query engines that can run on top of open-source HDFS and cloud object stores further reduces the cost of implementing dedicated reporting systems.

Furthermore, you can use these principles when building reporting solutions:

  • To reduce licensing costs of the commercial databases, move the reporting activity to an open-source database.
  • Use a log-based, real-time, change data capture (CDC), data-integration solution, which can replicate OLTP data from source systems, preferably in real-time mode, and provide a current view of the data. You can enable the data replication between the source and the target reporting systems using database CDC solutions. The transaction log-based CDC solutions capture database changes noninvasively from the source database and replicate them to the target datastore or file systems.

Prerequisites

If you use GoldenGate with Kafka and are considering cloud migration, you can benefit from this post. This post also assumes prior knowledge of GoldenGate and does not detail steps to install and configure GoldenGate. Knowledge of Java and Maven is also assumed. Ensure that a VPC with three subnets is available for manual deployment.

Understanding the architecture of this solution

The following workflow diagram (Figure 1) illustrates the solution that this post describes:

  1. Amazon RDS for Oracle acts as the source.
  2. A GoldenGate CDC solution produces data for Amazon Managed Streaming for Apache Kafka (Amazon MSK). GoldenGate streams the database CDC data to the consumer. Kafka topics with an MSK cluster receives the data from GoldenGate.
  3. The Apache Flink application running on Amazon EMR consumes the data and sinks it into an S3 bucket.
  4. Athena analyzes the data through queries. You can optionally run queries from Amazon Redshift Spectrum.

Data Pipeline

Figure 1

Amazon MSK is a fully managed service for Apache Kafka that makes it easy to provision  Kafka clusters with few clicks without the need to provision servers, storage and configuring Apache Zookeeper manually. Kafka is an open-source platform for building real-time streaming data pipelines and applications.

Amazon RDS for Oracle is a fully managed database that frees up your time to focus on application development. It manages time-consuming database administration tasks, including provisioning, backups, software patching, monitoring, and hardware scaling.

GoldenGate is a real-time, log-based, heterogeneous database CDC solution. GoldenGate supports data replication from any supported database to various target databases or big data platforms like Kafka. GoldenGate’s ability to write the transactional data captured from the source in different formats, including delimited text, JSON, and Avro, enables seamless integration with a variety of BI tools. Each row has additional metadata columns including database operation type (Insert/Update/Delete).

Flink is an open-source, stream-processing framework with a distributed streaming dataflow engine for stateful computations over unbounded and bounded data streams. EMR supports Flink, letting you create managed clusters from the AWS Management Console. Flink also supports exactly-once semantics with the checkpointing feature, which is vital to ensure data accuracy when processing database CDC data. You can also use Flink to transform the streaming data row by row or in batches using windowing capabilities.

S3 is an object storage service with high scalability, data availability, security, and performance. You can run big data analytics across your S3 objects with AWS query-in-place services like Athena.

Athena is a serverless query service that makes it easy to query and analyze data in S3. With Athena and S3 as a data source, you define the schema and start querying using standard SQL. There’s no need for complex ETL jobs to prepare your data for analysis, which makes it easy for anyone familiar with SQL skills to analyze large-scale datasets quickly.

The following diagram shows a more detailed view of the data pipeline:

  1. RDS for Oracle runs in a Single-AZ.
  2. GoldenGate runs on an Amazon EC2 instance.
  3. The MSK cluster spans across three Availability Zones.
  4. Kafka topic is set up in MSK.
  5. Flink runs on an EMR Cluster.
  6. Producer Security Group for Oracle DB and GoldenGate instance.
  7. Consumer Security Group for EMR with Flink.
  8. Gateway endpoint for S3 private access.
  9. NAT Gateway to download software components on GoldenGate instance.
  10. S3 bucket and Athena.

For simplicity, this setup uses a single VPC with multiple subnets to deploy resources.

Figure 2

Configuring single-click deployment using AWS CloudFormation

The AWS CloudFormation template included in this post automates the deployment of the end-to-end solution that this blog post describes. The template provisions all required resources including RDS for Oracle, MSK, EMR, S3 bucket, and also adds an EMR step with a JAR file to consume messages from Kafka topic on MSK. Here’s the list of steps to launch the template and test the solution:

  1. Launch the AWS CloudFormation template in the us-east-1
  2. After successful stack creation, obtain GoldenGate Hub Server public IP from the Outputs tab of cloudformation.
  3. Login to GoldenGate hub server using the IP address from step 2 as ec2-user and then switch to oracle user.sudo su – oracle
  4. Connect to the source RDS for Oracle database using the sqlplus client and provide password(source).[[email protected] ~]$ sqlplus [email protected]
  5. Generate database transactions using SQL statements available in oracle user’s home directory.
    SQL> @s
    
     SQL> @s1
    
     SQL> @s2

  6. Query STOCK_TRADES table from Amazon Athena console. It takes a few seconds after committing transactions on the source database for database changes to be available for Athena for querying.

Manually deploying components

The following steps describe the configurations required to stream Oracle-changed data to MSK and sink it to an S3 bucket using Flink running on EMR. You can then query the S3 bucket using Athena. If you deployed the solution using AWS CloudFormation as described in the previous step, skip to the Testing the solution section.

 

  1. Prepare an RDS source database for CDC using GoldenGate.The RDS source database version is Enterprise Edition 12.1.0.2.14. For instructions on configuring the RDS database, see Using Oracle GoldenGate with Amazon RDS. This post does not consider capturing data definition language (DDL).
  2. Configure an EC2 instance for the GoldenGate hub server.Configure the GoldenGate hub server using Oracle Linux server 7.6 (ami-b9c38ad3) image in the us-east-1 Region. The GoldenGate hub server runs the GoldenGate extract process that extracts changes in real time from the database transaction log files. The server also runs a replicat process that publishes database changes to MSK.The GoldenGate hub server requires the following software components:
  • Java JDK 1.8.0 (required for GoldenGate big data adapter).
  • GoldenGate for Oracle (12.3.0.1.4) and GoldenGate for big data adapter (12.3.0.1).
  • Kafka 1.1.1 binaries (required for GoldenGate big data adapter classpath).
  • An IAM role attached to the GoldenGate hub server to allow access to the MSK cluster for GoldenGate processes running on the hub server.Use the GoldenGate (12.3.0) documentation to install and configure the GoldenGate for Oracle database. The GoldenGate Integrated Extract parameter file is eora2msk.prm.
    EXTRACT eora2msk
    SETENV (NLSLANG=AL32UTF8)
    
    USERID [email protected], password ggadmin
    TRANLOGOPTIONS INTEGRATEDPARAMS (max_sga_size 256)
    EXTTRAIL /u01/app/oracle/product/ogg/dirdat/or
    LOGALLSUPCOLS
    
    TABLE SOURCE.STOCK_TRADES;

    The logallsupcols extract parameter ensures that a full database table row is generated for every DML operation on the source, including updates and deletes.

  1. Create a Kafka cluster using MSK and configure Kakfa topic.You can create the MSK cluster from the AWS Management Console, using the AWS CLI, or through an AWS CloudFormation template.
  • Use the list-clusters command to obtain a ClusterArn and a Zookeeper connection string after creating the cluster. You need this information to configure the GoldenGate big data adapter and Flink consumer. The following code illustrates the commands to run:
    $aws kafka list-clusters --region us-east-1
    {
        "ClusterInfoList": [
            {
                "EncryptionInfo": {
                    "EncryptionAtRest": {
                        "DataVolumeKMSKeyId": "arn:aws:kms:us-east-1:xxxxxxxxxxxx:key/717d53d8-9d08-4bbb-832e-de97fadcaf00"
                    }
                }, 
                "BrokerNodeGroupInfo": {
                    "BrokerAZDistribution": "DEFAULT", 
                    "ClientSubnets": [
                        "subnet-098210ac85a046999", 
                        "subnet-0c4b5ee5ff5ef70f2", 
                        "subnet-076c99d28d4ee87b4"
                    ], 
                    "StorageInfo": {
                        "EbsStorageInfo": {
                            "VolumeSize": 1000
                        }
                    }, 
                    "InstanceType": "kafka.m5.large"
                }, 
                "ClusterName": "mskcluster", 
                "CurrentBrokerSoftwareInfo": {
                    "KafkaVersion": "1.1.1"
                }, 
                "CreationTime": "2019-01-24T04:41:56.493Z", 
                "NumberOfBrokerNodes": 3, 
                "ZookeeperConnectString": "10.0.2.9:2181,10.0.0.4:2181,10.0.3.14:2181", 
                "State": "ACTIVE", 
                "CurrentVersion": "K13V1IB3VIYZZH", 
                "ClusterArn": "arn:aws:kafka:us-east-1:xxxxxxxxx:cluster/mskcluster/8920bb38-c227-4bef-9f6c-f5d6b01d2239-3", 
                "EnhancedMonitoring": "DEFAULT"
            }
        ]
    }

  • Obtain the IP addresses of the Kafka broker nodes by using the ClusterArn.
    $aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn arn:aws:kafka:us-east-1:xxxxxxxxxxxx:cluster/mskcluster/8920bb38-c227-4bef-9f6c-f5d6b01d2239-3
    {
        "BootstrapBrokerString": "10.0.3.6:9092,10.0.2.10:9092,10.0.0.5:9092"
    }

  • Create a Kafka topic. The solution in this post uses the same name as table name for Kafka topic.
    ./kafka-topics.sh --create --zookeeper 10.0.2.9:2181,10.0.0.4:2181,10.0.3.14:2181 --replication-factor 3 --partitions 1 --topic STOCK_TRADES

  1. Provision an EMR cluster with Flink.Create an EMR cluster 5.25 with Flink 1.8.0 (advanced option of the EMR cluster), and enable SSH access to the master node. Create and attach a role to the EMR master node so that Flink consumers can access the Kafka topic in the MSK cluster.
  2. Configure the Oracle GoldenGate big data adapter for Kafka on the GoldenGate hub server.Download and install the Oracle GoldenGate big data adapter (12.3.0.1.0) using the Oracle GoldenGate download link. For more information, see the Oracle GoldenGate 12c (12.3.0.1) installation documentation.The following is the GoldenGate producer property file for Kafka (custom_kafka_producer.properties):
    #Bootstrap broker string obtained from Step 3
    bootstrap.servers= 10.0.3.6:9092,10.0.2.10:9092,10.0.0.5:9092
    #bootstrap.servers=localhost:9092
    acks=1
    reconnect.backoff.ms=1000
    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    # 100KB per partition
    batch.size=16384
    linger.ms=0

    The following is the GoldenGate properties file for Kafka (Kafka.props):

    gg.handlerlist = kafkahandler
    gg.handler.kafkahandler.type=kafka
    gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
    #The following resolves the topic name using the short table name
    #gg.handler.kafkahandler.topicName=SOURCE
    gg.handler.kafkahandler.topicMappingTemplate=${tableName}
    #The following selects the message key using the concatenated primary keys
    gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
    gg.handler.kafkahandler.format=json_row
    #gg.handler.kafkahandler.format=delimitedtext
    #gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
    #gg.handler.kafkahandler.SchemaTopicName=oratopic
    gg.handler.kafkahandler.BlockingSend =false
    gg.handler.kafkahandler.includeTokens=false
    gg.handler.kafkahandler.mode=op
    goldengate.userexit.writers=javawriter
    javawriter.stats.display=TRUE
    javawriter.stats.full=TRUE
    
    gg.log=log4j
    #gg.log.level=INFO
    gg.log.level=DEBUG
    gg.report.time=30sec
    gg.classpath=dirprm/:/home/oracle/kafka/kafka_2.11-1.1.1/libs/*
    
    javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

    The following is the GoldenGate replicat parameter file (rkafka.prm):

    REPLICAT rkafka
    -- Trail file for this example is located in "AdapterExamples/trail" directory
    -- Command to add REPLICAT
    -- add replicat rkafka, exttrail AdapterExamples/trail/tr
    TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
    REPORTCOUNT EVERY 1 MINUTES, RATE
    GROUPTRANSOPS 10000
    MAP SOURCE.STOCK_TRADES, TARGET SOURCE.STOCK_TRADES;

  3. Create an S3 bucket and directory with a table name underneath for Flink to store (sink) Oracle CDC data.
  4. Configure a Flink consumer to read from the Kafka topic that writes the CDC data to an S3 bucket.For instructions on setting up a Flink project using the Maven archetype, see Flink Project Build Setup.The following code example is the pom.xml file, used with the Maven project. For more information, see Getting Started with Maven.
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-quickstart-java</artifactId>
      <version>1.8.0</version>
      <packaging>jar</packaging>
    
      <name>flink-quickstart-java</name>
      <url>http://www.example.com</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <slf4j.version>@[email protected]</slf4j.version>
        <log4j.version>@[email protected]</log4j.version>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
      </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-connector-filesystem_2.11</artifactId>
         <version>1.8.0</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>
         <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-s3-fs-presto</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
       <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_2.11</artifactId>
          <version>1.8.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.11</artifactId>
          <version>1.8.0</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.11</artifactId>
          <version>1.8.0</version>
        </dependency>
    
        <dependency>
          <groupId>com.typesafe.akka</groupId>
          <artifactId>akka-actor_2.11</artifactId>
          <version>2.4.20</version>
        </dependency>
        <dependency>
           <groupId>com.typesafe.akka</groupId>
           <artifactId>akka-protobuf_2.11</artifactId>
           <version>2.4.20</version>
        </dependency>
    <build>
      <plugins>
         <plugin>
             <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                       <execution>
                          <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                             </goals>
                           <configuration>
                          <artifactSet>
                      <excludes>
    
                             <!-- Excludes here -->
                               </excludes>
    </artifactSet>
                    <filters>
                            <filter>
                                                                                     <artifact>org.apache.flink:*</artifact>
                            </filter>
                       </filters>
                 <transformers>
                   <!-- add Main-Class to manifest file -->
                                                                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                                                            <mainClass>flinkconsumer.flinkconsumer</mainClass>
                   </transformer>
                                                                             <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                                                         <resource>reference.conf</resource>
                                                                    </transformer>
                                                            </transformers>
                    <relocations>
                          <relocation>
                                                                <pattern>org.codehaus.plexus.util</pattern>
                                                                  <shadedPattern>org.shaded.plexus.util</shadedPattern>
                        <excludes>
                                                                      <exclude>org.codehaus.plexus.util.xml.Xpp3Dom</exclude>
                                                                      <exclude>org.codehaus.plexus.util.xml.pull.*</exclude>
                                                                  </excludes>
                                                               </relocation>
                                                            </relocations>
                                                            <createDependencyReducedPom>false</createDependencyReducedPom>
                                                    </configuration>
                                            </execution>
                                    </executions>
                            </plugin>
    <!-- Add the main class as a manifest entry -->
                            <plugin>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-jar-plugin</artifactId>
                                    <version>2.5</version>
                                    <configuration>
                                            <archive>
                                                    <manifestEntries>
                                                            <Main-Class>flinkconsumer.flinkconsumer</Main-Class>
                                                    </manifestEntries>
                                            </archive>
           </configuration>
                            </plugin>
    
                            <plugin>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-compiler-plugin</artifactId>
                                    <version>3.1</version>
                                    <configuration>
                                            <source>1.7</source>
                                            <target>1.7</target>
                                    </configuration>
                            </plugin>
                    </plugins>
    
    </build>
    <profiles>
                    <profile>
                            <id>build-jar</id>
                            <activation>
                                    <activeByDefault>false</activeByDefault>
                            </activation>
                    </profile>
            </profiles>
    
    
    </project>

    Compile the following Java program using mvn clean install and generate the JAR file:

    package flinkconsumer;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.typeutils.TypeExtractor;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    import org.apache.flink.streaming.util.serialization.SerializationSchema;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.slf4j.LoggerFactory;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import akka.actor.ActorSystem;
    import akka.stream.ActorMaterializer;
    import akka.stream.Materializer;
    import com.typesafe.config.Config;
    import org.apache.flink.streaming.connectors.fs.*;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
    import java.util.stream.Collectors;
    import java.util.Arrays;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.Properties;
    import java.util.regex.Pattern;
    import java.io.*;
    import java.net.BindException;
    import java.util.*;
    import java.util.Map.*;
    import java.util.Arrays;
    
    public class flinkconsumer{
    
        public static void main(String[] args) throws Exception {
            // create Streaming execution environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setBufferTimeout(1000);
            env.enableCheckpointing(5000);
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.0.3.6:9092,10.0.2.10:9092,10.0.0.5:9092");
            properties.setProperty("group.id", "flink");
            properties.setProperty("client.id", "demo1");
    
            DataStream<String> message = env.addSource(new FlinkKafkaConsumer<>("STOCK_TRADES", new SimpleStringSchema(),properties));
            env.enableCheckpointing(60_00);
            env.setStateBackend(new FsStateBackend("hdfs://ip-10-0-3-12.ec2.internal:8020/flink/checkpoints"));
    
            RollingSink<String> sink= new RollingSink<String>("s3://flink-stream-demo/STOCK_TRADES");
           // sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HHmm"));
           // The bucket part file size in bytes.
               sink.setBatchSize(400);
             message.map(new MapFunction<String, String>() {
                private static final long serialVersionUID = -6867736771747690202L;
                @Override
                public String map(String value) throws Exception {
                    //return " Value: " + value;
                    return value;
                }
            }).addSink(sink).setParallelism(1);
            env.execute();
        }
    }

    Log in as a Hadoop user to an EMR master node, start Flink, and execute the JAR file:

    $ /usr/bin/flink run ./flink-quickstart-java-1.7.0.jar

  5. Create the stock_trades table from the Athena console. Each JSON document must be on a new line.
    CREATE EXTERNAL TABLE `stock_trades`(
      `trade_id` string COMMENT 'from deserializer', 
      `ticker_symbol` string COMMENT 'from deserializer', 
      `units` int COMMENT 'from deserializer', 
      `unit_price` float COMMENT 'from deserializer', 
      `trade_date` timestamp COMMENT 'from deserializer', 
      `op_type` string COMMENT 'from deserializer')
    ROW FORMAT SERDE 
      'org.openx.data.jsonserde.JsonSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
    LOCATION
      's3://flink-cdc-demo/STOCK_TRADES'
    TBLPROPERTIES (
      'has_encrypted_data'='false', 
      'transient_lastDdlTime'='1561051196')

    For more information, see Hive JSON SerDe.

Testing the solution

To test that the solution works, complete the following steps:

  1. Log in to the source RDS instance from the GoldenGate hub server and perform insert, update, and delete operations on the stock_trades table:
    $sqlplus [email protected]
    SQL> insert into stock_trades values(6,'NEW',29,75,sysdate);
    SQL> update stock_trades set units=999 where trade_id=6;
    SQL> insert into stock_trades values(7,'TEST',30,80,SYSDATE);
    SQL>insert into stock_trades values (8,'XYZC', 20, 1800,sysdate);
    SQL> commit;

  2. Monitor the GoldenGate capture from the source database using the following stats command:
    [[email protected] 12.3.0]$ pwd
    /u02/app/oracle/product/ogg/12.3.0
    [[email protected] 12.3.0]$ ./ggsci
    
    Oracle GoldenGate Command Interpreter for Oracle
    Version 12.3.0.1.4 OGGCORE_12.3.0.1.0_PLATFORMS_180415.0359_FBO
    Linux, x64, 64bit (optimized), Oracle 12c on Apr 16 2018 00:53:30
    Operating system character set identified as UTF-8.
    
    Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved.
    
    
    
    GGSCI (ip-10-0-1-170.ec2.internal) 1> stats eora2msk

  3. Monitor the GoldenGate replicat to a Kafka topic with the following:
    [[email protected] 12.3.0]$ pwd
    /u03/app/oracle/product/ogg/bdata/12.3.0
    [[email protected] 12.3.0]$ ./ggsci
    
    Oracle GoldenGate for Big Data
    Version 12.3.2.1.1 (Build 005)
    
    Oracle GoldenGate Command Interpreter
    Version 12.3.0.1.2 OGGCORE_OGGADP.12.3.0.1.2_PLATFORMS_180712.2305
    Linux, x64, 64bit (optimized), Generic on Jul 13 2018 00:46:09
    Operating system character set identified as UTF-8.
    
    Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved.
    
    
    
    GGSCI (ip-10-0-1-170.ec2.internal) 1> stats rkafka

  4. Query the stock_trades table using the Athena console.

Summary

This post illustrates how you can offload reporting activity to Athena with S3 to reduce reporting costs and improve OLTP performance on the source database. This post serves as a guide for setting up a solution in the staging environment.

Deploying this solution in a production environment may require additional considerations, for example, high availability of GoldenGate hub servers, different file encoding formats for optimal query performance, and security considerations. Additionally, you can achieve similar outcomes using technologies like AWS Database Migration Service instead of GoldenGate for database CDC and Kafka Connect for the S3 sink.

 


About the Authors

Sreekanth Krishnavajjala is a solutions architect at Amazon Web Services.

 

 

 

 

Vinod Kataria is a senior partner solutions architect at Amazon Web Services.