Tag Archives: Internet of Things

Build a dynamic rules engine with Amazon Managed Service for Apache Flink

Post Syndicated from Steven Carpenter original https://aws.amazon.com/blogs/big-data/build-a-dynamic-rules-engine-with-amazon-managed-service-for-apache-flink/

Imagine you have some streaming data. It could be from an Internet of Things (IoT) sensor, log data ingestion, or even shopper impression data. Regardless of the source, you have been tasked with acting on the data—alerting or triggering when something occurs. Martin Fowler says: “You can build a simple rules engine yourself. All you need is to create a bunch of objects with conditions and actions, store them in a collection, and run through them to evaluate the conditions and execute the actions.”

A business rules engine (or simply rules engine) is a software system that executes many rules based on some input to determine some output. Simplistically, it’s a lot of “if then,” “and,” and “or” statements that are evaluated on some data. There are many different business rule systems, such as Drools, OpenL Tablets, or even RuleBook, and they all share a commonality: they define rules (collection of objects with conditions) that get executed (evaluate the conditions) to derive an output (execute the actions). The following is a simplistic example:

if (office_temperature) < 50 degrees => send an alert

if (office_temperature) < 50 degrees AND (occupancy_sensor) == TRUE => < Trigger action to turn on heat>

When a single condition or a composition of conditions evaluates to true, it is desired to send out an alert to potentially act on that event (trigger the heat to warm the 50 degrees room).

This post demonstrates how to implement a dynamic rules engine using Amazon Managed Service for Apache Flink. Our implementation provides the ability to create dynamic rules that can be created and updated without the need to change or redeploy the underlying code or implementation of the rules engine itself. We discuss the architecture, the key services of the implementation, some implementation details that you can use to build your own rules engine, and an AWS Cloud Development Kit (AWS CDK) project to deploy this in your own account.

Solution overview

The workflow of our solution starts with the ingestion of the data. We assume that we have some source data. It could be from a variety of places, but for this demonstration, we use streaming data (IoT sensor data) as our input data. This is what we will evaluate our rules on. For example purposes, let’s assume we are looking at data from our AnyCompany Home Thermostat. We’ll see attributes like temperature, occupancy, humidity, and more. The thermostat publishes the respective values every 1 minute, so we’ll base our rules around that idea. Because we’re ingesting this data in near real time, we need a service designed specifically for this use case. For this solution, we use Amazon Kinesis Data Streams.

In a traditional rules engine, there may be a finite list of rules. The creation of new rules would likely involve a revision and redeployment of the code base, a replacement of some rules file, or some overwriting process. However, a dynamic rules engine is different. Much like our streaming input data, our rules can also be streamed as well. Here we can use Kinesis Data Streams to stream our rules as they are created.

At this point, we have two streams of data:

  • The raw data from our thermostat
  • The business rules perhaps created through a user interface

The following diagram illustrates we can connect these streams together.Architecture Diagram

Connecting streams

A typical use case for Managed Service for Apache Flink is to interactively query and analyze data in real time and continuously produce insights for time-sensitive use cases. With this in mind, if you have a rule that corresponds to the temperature dropping below a certain value (especially in winter), it might be critical to evaluate and produce a result as timely as possible.

Apache Flink connectors are software components that move data into and out of a Managed Service for Apache Flink application. Connectors are flexible integrations that let you read from files and directories. They consist of complete modules for interacting with AWS services and third-party systems. For more details about connectors, see Use Apache Flink connectors with Managed Service for Apache Flink.

We use two types of connectors (operators) for this solution:

  • Sources – Provide input to your application from a Kinesis data stream, file, or other data source
  • Sinks – Send output from your application to a Kinesis data stream, Amazon Data Firehose stream, or other data destination

Flink applications are streaming dataflows that may be transformed by user-defined operators. These dataflows form directed graphs that start with one or more sources and end in one or more sinks. The following diagram illustrates an example dataflow (source). As previously discussed, we have two Kinesis data streams that can be used as sources for our Flink program.

Flink Data Flow

The following code snippet shows how we have our Kinesis sources set up within our Flink code:

/**
* Creates a DataStream of Rule objects by consuming rule data from a Kinesis
* stream.
*
* @param env The StreamExecutionEnvironment for the Flink job
* @return A DataStream of Rule objects
* @throws IOException if an error occurs while reading Kinesis properties
*/
private DataStream<Rule> createRuleStream(StreamExecutionEnvironment env, Properties sourceProperties)
                throws IOException {
        String RULES_SOURCE = KinesisUtils.getKinesisRuntimeProperty("kinesis", "rulesTopicName");
        FlinkKinesisConsumer<String> kinesisConsumer = new FlinkKinesisConsumer<>(RULES_SOURCE,
                        new SimpleStringSchema(),
                        sourceProperties);
        DataStream<String> rulesStrings = env.addSource(kinesisConsumer)
                        .name("RulesStream")
                        .uid("rules-stream");
        return rulesStrings.flatMap(new RuleDeserializer()).name("Rule Deserialization");
}

/**
* Creates a DataStream of SensorEvent objects by consuming sensor event data
* from a Kinesis stream.
*
* @param env The StreamExecutionEnvironment for the Flink job
* @return A DataStream of SensorEvent objects
* @throws IOException if an error occurs while reading Kinesis properties
*/
private DataStream<SensorEvent> createSensorEventStream(StreamExecutionEnvironment env,
            Properties sourceProperties) throws IOException {
    String DATA_SOURCE = KinesisUtils.getKinesisRuntimeProperty("kinesis", "dataTopicName");
    FlinkKinesisConsumer<String> kinesisConsumer = new FlinkKinesisConsumer<>(DATA_SOURCE,
                    new SimpleStringSchema(),
                    sourceProperties);
    DataStream<String> transactionsStringsStream = env.addSource(kinesisConsumer)
                    .name("EventStream")
                    .uid("sensor-events-stream");

    return transactionsStringsStream.flatMap(new JsonDeserializer<>(SensorEvent.class))
                    .returns(SensorEvent.class)
                    .flatMap(new TimeStamper<>())
                    .returns(SensorEvent.class)
                    .name("Transactions Deserialization");
}

We use a broadcast state, which can be used to combine and jointly process two streams of events in a specific way. A broadcast state is a good fit for applications that need to join a low-throughput stream and a high-throughput stream or need to dynamically update their processing logic. The following diagram illustrates an example how the broadcast state is connected. For more details, see A Practical Guide to Broadcast State in Apache Flink.

Broadcast State

This fits the idea of our dynamic rules engine, where we have a low-throughput rules stream (added to as needed) and a high-throughput transactions stream (coming in at a regular interval, such as one per minute). This broadcast stream allows us to take our transactions stream (or the thermostat data) and connect it to the rules stream as shown in the following code snippet:

// Processing pipeline setup
DataStream<Alert> alerts = sensorEvents
    .connect(rulesStream)
    .process(new DynamicKeyFunction())
    .uid("partition-sensor-data")
    .name("Partition Sensor Data by Equipment and RuleId")
    .keyBy((equipmentSensorHash) -> equipmentSensorHash.getKey())
    .connect(rulesStream)
    .process(new DynamicAlertFunction())
    .uid("rule-evaluator")
    .name("Rule Evaluator");

To learn more about the broadcast state, see The Broadcast State Pattern. When the broadcast stream is connected to the data stream (as in the preceding example), it becomes a BroadcastConnectedStream. The function applied to this stream, which allows us to process the transactions and rules, implements the processBroadcastElement method. The KeyedBroadcastProcessFunction interface provides three methods to process records and emit results:

  • processBroadcastElement() – This is called for each record of the broadcasted stream (our rules stream).
  • processElement() – This is called for each record of the keyed stream. It provides read-only access to the broadcast state to prevent modifications that result in different broadcast states across the parallel instances of the function. The processElement method retrieves the rule from the broadcast state and the previous sensor event of the keyed state. If the expression evaluates to TRUE (discussed in the next section), an alert will be emitted.
  • onTimer() – This is called when a previously registered timer fires. Timers can be registered in the processElement method and are used to perform computations or clean up states in the future. This is used in our code to make sure any old data (as defined by our rule) is evicted as necessary.

We can handle the rule in the broadcast state instance as follows:

@Override
public void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out) throws Exception {
   BroadcastState<String, Rule> broadcastState = ctx.getBroadcastState(RulesEvaluator.Descriptors.rulesDescriptor);
   Long currentProcessTime = System.currentTimeMillis();
   // If we get a new rule, we'll give it insufficient data rule op status
    if (!broadcastState.contains(rule.getId())) {
        outputRuleOpData(rule, OperationStatus.INSUFFICIENT_DATA, currentProcessTime, ctx);
    }
   ProcessingUtils.handleRuleBroadcast(rule, broadcastState);
}

static void handleRuleBroadcast(FDDRule rule, BroadcastState<String, FDDRule> broadcastState)
        throws Exception {
    switch (rule.getStatus()) {
        case ACTIVE:
            broadcastState.put(rule.getId(), rule);
            break;
        case INACTIVE:
            broadcastState.remove(rule.getId());
            break;
    }
}

Notice what happens in the code when the rule status is INACTIVE. This would remove the rule from the broadcast state, which would then no longer consider the rule to be used. Similarly, handling the broadcast of a rule that is ACTIVE would add or replace the rule within the broadcast state. This is allowing us to dynamically make changes, adding and removing rules as necessary.

Evaluating rules

Rules can be evaluated in a variety of ways. Although it’s not a requirement, our rules were created in a Java Expression Language (JEXL) compatible format. This allows us to evaluate rules by providing a JEXL expression along with the appropriate context (the necessary transactions to reevaluate the rule or key-value pairs), and simply calling the evaluate method:

JexlExpression expression = jexl.createExpression(rule.getRuleExpression());
Boolean isAlertTriggered = (Boolean) expression.evaluate(context);

A powerful feature of JEXL is that not only can it support simple expressions (such as those including comparison and arithmetic), it also has support for user-defined functions. JEXL allows you to call any method on a Java object using the same syntax. If there is a POJO with the name SENSOR_cebb1baf_2df0_4267_b489_28be562fccea that has the method hasNotChanged, you would call that method using the expression. You can find more of these user-defined functions that we used within our SensorMapState class.

Let’s look at an example of how this would work, using a rule expression exists that reads as follows:

"SENSOR_cebb1baf_2df0_4267_b489_28be562fccea.hasNotChanged(5)"

This rule, evaluated by JEXL, would be equivalent to a sensor that hasn’t changed in 5 minutes

The corresponding user-defined function (part of SensorMapState) that is exposed to JEXL (using the context) is as follows:

public Boolean hasNotChanged(Integer time) {
    Long minutesSinceChange = getMinutesSinceChange();
    log.debug("Time: " + time + " | Minutes since change: " + minutesSinceChange);
    return minutesSinceChange >  time;
}

Relevant data, like that below, would go into the context window, which would then be used to evaluate the rule.

{
    "id": "SENSOR_cebb1baf_2df0_4267_b489_28be562fccea",
    "measureValue": 10,
    "eventTimestamp": 1721666423000
}

In this case, the result (or value of isAlertTriggered) is TRUE.

Creating sinks

Much like how we previously created sources, we also can create sinks. These sinks will be used as the end to our stream processing where our analyzed and evaluated results will get emitted for future use. Like our source, our sink is also a Kinesis data stream, where a downstream Lambda consumer will iterate the records and process them to take the appropriate action. There are many applications of downstream processing; for example, we can persist this evaluation result, create a push notification, or update a rule dashboard.

Based on the previous evaluation, we have the following logic within the process function itself:

if (isAlertTriggered) {
    alert = new Alert(rule.getEquipmentName(), rule.getName(), rule.getId(), AlertStatus.START,
            triggeringEvents, currentEvalTime);
    log.info("Pushing {} alert for {}", AlertStatus.START, rule.getName());
}
out.collect(alert);

When the process function emits the alert, the alert response is sent to the sink, which then can be read and used downstream in the architecture:

alerts.flatMap(new JsonSerializer<>(Alert.class))
    .name("Alerts Deserialization").sinkTo(createAlertSink(sinkProperties))
    .uid("alerts-json-sink")
    .name("Alerts JSON Sink");

At this point, we can then process it. We have a Lambda function logging the records where we can see the following:

{
   "equipmentName":"THERMOSTAT_1",
   "ruleName":"RuleTest2",
   "ruleId":"cda160c0-c790-47da-bd65-4abae838af3b",
   "status":"START",
   "triggeringEvents":[
      {
         "equipment":{
            "id":"THERMOSTAT_1",
         },
         "id":"SENSOR_cebb1baf_2df0_4267_b489_28be562fccea",
         "measureValue":20.0,
         "eventTimestamp":1721672715000,
         "ingestionTimestamp":1721741792958
      }
   ],
   "timestamp":1721741792790
}

Although simplified in this example, these code snippets form the basis for taking the evaluation results and sending them elsewhere.

Conclusion

In this post, we demonstrated how to implement a dynamic rules engine using Managed Service for Apache Flink with both the rules and input data streamed through Kinesis Data Streams. You can learn more about it with the e-learning that we have available.

As companies seek to implement near real-time rules engines, this architecture presents a compelling solution. Managed Service for Apache Flink offers powerful capabilities for transforming and analyzing streaming data in real time, while simplifying the management of Flink workloads and seamlessly integrating with other AWS services.

To help you get started with this architecture, we’re excited to announce that we’ll be publishing our complete rules engine code as a sample on GitHub. This comprehensive example will go beyond the code snippets provided in our post, offering a deeper look into the intricacies of building a dynamic rules engine with Flink.

We encourage you to explore this sample code, adapt it to your specific use case, and take advantage of the full potential of real-time data processing in your applications. Check out the GitHub repository, and don’t hesitate to reach out with any questions or feedback as you embark on your journey with Flink and AWS!


About the Authors

Steven Carpenter is a Senior Solution Developer on the AWS Industries Prototyping and Customer Engineering (PACE) team, helping AWS customers bring innovative ideas to life through rapid prototyping on the AWS platform. He holds a master’s degree in Computer Science from Wayne State University in Detroit, Michigan. Connect with Steven on LinkedIn!

Aravindharaj Rajendran is a Senior Solution Developer within the AWS Industries Prototyping and Customer Engineering (PACE) team, based in Herndon, VA. He helps AWS customers materialize their innovative ideas by rapid prototyping using the AWS platform. Outside of work, he loves playing PC games, Badminton and Traveling.

Israel’s Pager Attacks and Supply Chain Vulnerabilities

Post Syndicated from Bruce Schneier original https://www.schneier.com/blog/archives/2024/09/israels-pager-attacks.html

Israel’s brazen attacks on Hezbollah last week, in which hundreds of pagers and two-way radios exploded and killed at least 37 people, graphically illustrated a threat that cybersecurity experts have been warning about for years: Our international supply chains for computerized equipment leave us vulnerable. And we have no good means to defend ourselves.

Though the deadly operations were stunning, none of the elements used to carry them out were particularly new. The tactics employed by Israel, which has neither confirmed nor denied any role, to hijack an international supply chain and embed plastic explosives in Hezbollah devices have been used for years. What’s new is that Israel put them together in such a devastating and extravagantly public fashion, bringing into stark relief what the future of great power competition will look like—in peacetime, wartime and the ever expanding gray zone in between.

The targets won’t be just terrorists. Our computers are vulnerable, and increasingly so are our cars, our refrigerators, our home thermostats and many other useful things in our orbits. Targets are everywhere.

The core component of the operation, implanting plastic explosives in pagers and radios, has been a terrorist risk since Richard Reid, the so-called shoe bomber, tried to ignite some on an airplane in 2001. That’s what all of those airport scanners are designed to detect—both the ones you see at security checkpoints and the ones that later scan your luggage. Even a small amount can do an impressive degree of damage.

The second component, assassination by personal device, isn’t new, either. Israel used this tactic against a Hamas bomb maker in 1996 and a Fatah activist in 2000. Both were killed by remotely detonated booby-trapped cellphones.

The final and more logistically complex piece of Israel’s plan, attacking an international supply chain to compromise equipment at scale, is something that the United States has done, though for different purposes. The National Security Agency has intercepted communications equipment in transit and modified it not for destructive purposes but for eavesdropping. We know from an Edward Snowden document that the agency did this to a Cisco router destined for a Syrian telecommunications company. Presumably, this wasn’t the agency’s only operation of this type.

Creating a front company to fool victims isn’t even a new twist. Israel reportedly created a shell company to produce and sell explosive-laden devices to Hezbollah. In 2019 the FBI created a company that sold supposedly secure cellphones to criminals—not to assassinate them but to eavesdrop on and then arrest them.

The bottom line: Our supply chains are vulnerable, which means that we are vulnerable. Any individual, country or group that interacts with a high-tech supply chain can subvert the equipment passing through it. It can be subverted to eavesdrop. It can be subverted to degrade or fail on command. And although it’s harder, it can be subverted to kill.

Personal devices connected to the internet—and countries where they are in high use, such as the United States—are especially at risk. In 2007 the Idaho National Laboratory demonstrated that a cyberattack could cause a high-voltage generator to explode. In 2010 a computer virus believed to have been developed by the United States and Israel destroyed centrifuges at an Iranian nuclear facility. A 2017 dump of CIA documents included statements about the possibility of remotely hacking cars, which WikiLeaks asserted could be used to carry out “nearly undetectable assassinations.” This isn’t just theoretical: In 2015 a Wired reporter allowed hackers to remotely take over his car while he was driving it. They disabled the engine while he was on a highway.

The world has already begun to adjust to this threat. Many countries are increasingly wary of buying communications equipment from countries they don’t trust. The United States and others are banning large routers from the Chinese company Huawei because we fear that they could be used for eavesdropping and—even worse—disabled remotely in a time of escalating hostilities. In 2019 there was a minor panic over Chinese-made subway cars that could have been modified to eavesdrop on their riders.

It’s not just finished equipment that is under the scanner. More than a decade ago, the US military investigated the security risks of using Chinese parts in its equipment. In 2018 a Bloomberg report revealed US investigators had accused China of modifying computer chips to steal information.

It’s not obvious how to defend against these and similar attacks. Our high-tech supply chains are complex and international. It didn’t raise any red flags to Hezbollah that the group’s pagers came from a Hungary-based company that sourced them from Taiwan, because that sort of thing is perfectly normal. Most of the electronics Americans buy come from overseas, including our iPhones, whose parts come from dozens of countries before being pieced together primarily in China.

That’s a hard problem to fix. We can’t imagine Washington passing a law requiring iPhones to be made entirely in the United States. Labor costs are too high, and our country doesn’t have the domestic capacity to make these things. Our supply chains are deeply, inexorably international, and changing that would require bringing global economies back to the 1980s.

So what happens now? As for Hezbollah, its leaders and operatives will no longer be able to trust equipment connected to a network—very likely one of the primary goals of the attacks. And the world will have to wait to see if there are any long-term effects of this attack and how the group will respond.

But now that the line has been crossed, other countries will almost certainly start to consider this sort of tactic as within bounds. It could be deployed against a military during a war or against civilians in the run-up to a war. And developed countries like the United States will be especially vulnerable, simply because of the sheer number of vulnerable devices we have.

This essay originally appeared in The New York Times.

Hacking Wireless Bicycle Shifters

Post Syndicated from Bruce Schneier original https://www.schneier.com/blog/archives/2024/08/hacking-wireless-bicycle-shifters.html

This is yet another insecure Internet-of-things story, this one about wireless gear shifters for bicycles. These gear shifters are used in big-money professional bicycle races like the Tour de France, which provides an incentive to actually implement this attack.

Research paper. Another news story.

Slashdot thread.

Robot Dog Internet Jammer

Post Syndicated from Bruce Schneier original https://www.schneier.com/blog/archives/2024/07/robot-dog-internet-jammer.html

Supposedly the DHS has these:

The robot, called “NEO,” is a modified version of the “Quadruped Unmanned Ground Vehicle” (Q-UGV) sold to law enforcement by a company called Ghost Robotics. Benjamine Huffman, the director of DHS’s Federal Law Enforcement Training Centers (FLETC), told police at the 2024 Border Security Expo in Texas that DHS is increasingly worried about criminals setting “booby traps” with internet of things and smart home devices, and that NEO allows DHS to remotely disable the home networks of a home or building law enforcement is raiding. The Border Security Expo is open only to law enforcement and defense contractors. A transcript of Huffman’s speech was obtained by the Electronic Frontier Foundation’s Dave Maass using a Freedom of Information Act request and was shared with 404 Media.

“NEO can enter a potentially dangerous environment to provide video and audio feedback to the officers before entry and allow them to communicate with those in that environment,” Huffman said, according to the transcript. “NEO carries an onboard computer and antenna array that will allow officers the ability to create a ‘denial-of-service’ (DoS) event to disable ‘Internet of Things’ devices that could potentially cause harm while entry is made.”

Slashdot thread.

Building a scalable streaming data platform that enables real-time and batch analytics of electric vehicles on AWS

Post Syndicated from Ayush Agrawal original https://aws.amazon.com/blogs/big-data/building-a-scalable-streaming-data-platform-that-enables-real-time-and-batch-analytics-of-electric-vehicles-on-aws/

The automobile industry has undergone a remarkable transformation because of the increasing adoption of electric vehicles (EVs). EVs, known for their sustainability and eco-friendliness, are paving the way for a new era in transportation. As environmental concerns and the push for greener technologies have gained momentum, the adoption of EVs has surged, promising to reshape our mobility landscape.

The surge in EVs brings with it a profound need for data acquisition and analysis to optimize their performance, reliability, and efficiency. In the rapidly evolving EV industry, the ability to harness, process, and derive insights from the massive volume of data generated by EVs has become essential for manufacturers, service providers, and researchers alike.

As the EV market is expanding with many new and incumbent players trying to capture the market, the major differentiating factor will be the performance of the vehicles.

Modern EVs are equipped with an array of sensors and systems that continuously monitor various aspects of their operation including parameters such as voltage, temperature, vibration, speed, and so on. From battery management to motor performance, these data-rich machines provide a wealth of information that, when effectively captured and analyzed, can revolutionize vehicle design, enhance safety, and optimize energy consumption. The data can be used to do predictive maintenance, device anomaly detection, real-time customer alerts, remote device management, and monitoring.

However, managing this deluge of data isn’t without its challenges. As the adoption of EVs accelerates, the need for robust data pipelines capable of collecting, storing, and processing data from an exponentially growing number of vehicles becomes more pronounced. Moreover, the granularity of data generated by each vehicle has increased significantly, making it essential to efficiently handle the ever-increasing number of data points. The challenges include not only the technical intricacies of data management but also concerns related to data security, privacy, and compliance with evolving regulations.

In this blog post, we delve into the intricacies of building a reliable data analytics pipeline that can scale to accommodate millions of vehicles, each generating hundreds of metrics every second using Amazon OpenSearch Ingestion. We also provide guidelines and sample configurations to help you implement a solution.

Of the prerequisites that follow, the IOT topic rule and the Amazon Managed Streaming for Apache Kafka (Amazon MSK) cluster can be set up by following How to integrate AWS IoT Core with Amazon MSK. The steps to create an Amazon OpenSearch Service cluster are available in Creating and managing Amazon OpenSearch Service domains.

Prerequisites

Before you begin the implementing the solution, you need the following:

  • IOT topic rule
  • Amazon MSK Simple Authentication and Security Layer/Salted Challenge Response Mechanism (SASL/SCRAM) cluster
  • Amazon OpenSearch Service domain

Solution overview

The following architecture diagram provides a scalable and fully managed modern data streaming platform. The architecture uses Amazon OpenSearch Ingestion to stream data into OpenSearch Service and Amazon Simple Storage Service (Amazon S3) to store the data. The data in OpenSearch powers real-time dashboards. The data can also be used to notify customers of any failures occurring on the vehicle (see Configuring alerts in Amazon OpenSearch Service). The data in Amazon S3 is used for business intelligence and long-term storage.

Architecture diagram

In the following sections, we focus on the following three critical pieces of the architecture in depth:

1. Amazon MSK to OpenSearch ingestion pipeline

2. Amazon OpenSearch Ingestion pipeline to OpenSearch Service

3. Amazon OpenSearch Ingestion to Amazon S3

Solution Walkthrough

Step 1: MSK to Amazon OpenSearch Ingestion pipeline

Because each electric vehicle streams massive volumes of data to Amazon MSK clusters through AWS IoT Core, making sense of this data avalanche is critical. OpenSearch Ingestion provides a fully managed serverless integration to tap into these data streams.

The Amazon MSK source in OpenSearch Ingestion uses Kafka’s Consumer API to read records from one or more MSK topics. The MSK source in OpenSearch Ingestion seamlessly connects to MSK to ingest the streaming data into OpenSearch Ingestion’s processing pipeline.

The following snippet illustrates the pipeline configuration for an OpenSearch Ingestion pipeline used to ingest data from an MSK cluster.

While creating an OpenSearch Ingestion pipeline, add the following snippet in the Pipeline configuration section.

version: "2"
msk-pipeline: 
  source: 
    kafka: 
      acknowledgments: true                  
      topics: 
         - name: "ev-device-topic " 
           group_id: "opensearch-consumer" 
           serde_format: json                 
      aws: 
        # Provide the Role ARN with access to MSK. This role should have a trust relationship with osis-pipelines.amazonaws.com 
        sts_role_arn: "arn:aws:iam:: ::<<account-id>>:role/opensearch-pipeline-Role"
        # Provide the region of the domain. 
        region: "<<region>>" 
        msk: 
          # Provide the MSK ARN.  
          arn: "arn:aws:kafka:<<region>>:<<account-id>>:cluster/<<name>>/<<id>>" 

When configuring Amazon MSK and OpenSearch Ingestion, it’s essential to establish an optimal relationship between the number of partitions in your Kafka topics and the number of OpenSearch Compute Units (OCUs) allocated to your ingestion pipelines. This optimal configuration ensures efficient data processing and maximizes throughput. You can read more about it in Configure recommended compute units (OCUs) for the Amazon MSK pipeline.

Step 2: OpenSearch Ingestion pipeline to OpenSearch Service

OpenSearch Ingestion offers a direct method for streaming EV data into OpenSearch. The OpenSearch sink plugin channels data from multiple sources directly into the OpenSearch domain. Instead of manually provisioning the pipeline, you define the capacity for your pipeline using OCUs. Each OCU provides 6 GB of memory and two virtual CPUs. To use OpenSearch Ingestion auto-scaling optimally, it’s essential to configure the maximum number of OCUs for a pipeline based on the number of partitions in the topics being ingested. If a topic has a large number of partitions (for example, more than 96, which is the maximum OCUs per pipeline), it’s recommended to configure the pipeline with a maximum of 1–96 OCUs. This way, the pipeline can automatically scale up or down within this range as needed. However, if a topic has a low number of partitions (for example, fewer than 96), it’s advisable to set the maximum number of OCUs to be equal to the number of partitions. This approach ensures that each partition is processed by a dedicated OCU enabling parallel processing and optimal performance. In scenarios where a pipeline ingests data from multiple topics, the topic with the highest number of partitions should be used as a reference to configure the maximum OCUs. Additionally, if higher throughput is required, you can create another pipeline with a new set of OCUs for the same topic and consumer group, enabling near-linear scalability.

OpenSearch Ingestion provides several pre-defined configuration blueprints that can help you quickly build your ingestion pipeline on AWS

The following snippet illustrates pipeline configuration for an OpenSearch Ingestion pipeline using OpenSearch as a SINK with a dead letter queue (DLQ) to Amazon S3. When a pipeline encounters write errors, it creates DLQ objects in the configured S3 bucket. DLQ objects exist within a JSON file as an array of failed events.

sink: 
      - opensearch: 
          # Provide an AWS OpenSearch Service domain endpoint 
          hosts: [ "https://<<domain-name>>.<<region>>.es.amazonaws.com" ] 
          aws: 
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com 
            sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>" 
          # Provide the region of the domain. 
            region: "<<region>>" 
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection 
          # serverless: true 
          # index name can be auto-generated from topic name 
          index: "index_ev_pipe-%{yyyy.MM.dd}" 
          # Enable 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x 
          #distribution_version: "es6" 
          # Enable the S3 DLQ to capture any failed requests in Ohan S3 bucket 
          dlq: 
            s3: 
            # Provide an S3 bucket 
              bucket: "<<bucket-name>>"
            # Provide a key path prefix for the failed requests
              key_path_prefix: "oss-pipeline-errors/dlq"
            # Provide the region of the bucket.
              region: "<<region>>"
            # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
              sts_role_arn: "arn:aws:iam:: <<account-id>>:role/<<role-name>>"

Step 3: OpenSearch Ingestion to Amazon S3

OpenSearch Ingestion offers a built-in sink for loading streaming data directly into S3. The service can compress, partition, and optimize the data for cost-effective storage and analytics in Amazon S3. Data loaded into S3 can be partitioned for easier query isolation and lifecycle management. Partitions can be based on vehicle ID, date, geographic region, or other dimensions as needed for your queries.

The following snippet illustrates how we’ve partitioned and stored EV data in Amazon S3.

- s3:
            aws:
              # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
              # Provide the region of the domain.
                region: "<<region>>"
            # Replace with the bucket to send the logs to
            bucket: "evbucket"
            object_key:
              # Optional path_prefix for your s3 objects
              path_prefix: "index_ev_pipe/year=%{yyyy}/month=%{MM}/day=%{dd}/hour=%{HH}"
            threshold:
              event_collect_timeout: 60s
            codec:
              parquet:
                auto_schema: true

The pipeline can be created following the steps in Creating Amazon OpenSearch Ingestion pipelines.

The following is the complete pipeline configuration, combining the configuration of all three steps. Update the Amazon Resource Names (ARNs), AWS Region, Open Search Service domain endpoint, and S3 names as needed.

The entire OpenSearch Ingestion pipeline configuration can be directly copied into the ‘Pipeline configuration’ field in the AWS Management Console while creating the OpenSearch Ingestion pipeline

version: "2"
msk-pipeline: 
  source: 
    kafka: 
      acknowledgments: true           # Default is false  
      topics: 
         - name: "<<msk-topic-name>>" 
           group_id: "opensearch-consumer" 
           serde_format: json        
      aws: 
        # Provide the Role ARN with access to MSK. This role should have a trust relationship with osis-pipelines.amazonaws.com 
        sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
        # Provide the region of the domain. 
        region: "<<region>>" 
        msk: 
          # Provide the MSK ARN.  
          arn: "arn:aws:kafka:us-east-1:<<account-id>>:cluster/<<cluster-name>>/<<cluster-id>>" 
  processor:
      - parse_json:
  sink: 
      - opensearch: 
          # Provide an AWS OpenSearch Service domain endpoint 
          hosts: [ "https://<<opensearch-service-domain-endpoint>>.us-east-1.es.amazonaws.com" ] 
          aws: 
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com 
            sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>" 
          # Provide the region of the domain. 
            region: "<<region>>" 
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection 
          # index name can be auto-generated from topic name 
          index: "index_ev_pipe-%{yyyy.MM.dd}" 
          # Enable 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x 
          #distribution_version: "es6" 
          # Enable the S3 DLQ to capture any failed requests in Ohan S3 bucket 
          dlq: 
            s3: 
            # Provide an S3 bucket 
              bucket: "<<bucket-name>>"
            # Provide a key path prefix for the failed requests
              key_path_prefix: "oss-pipeline-errors/dlq"
            # Provide the region of the bucket.
              region: "<<region>>"
            # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
              sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
      - s3:
            aws:
              # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
              # Provide the region of the domain.
                region: "<<region>>"
            # Replace with the bucket to send the logs to
            bucket: "<<bucket-name>>"
            object_key:
              # Optional path_prefix for your s3 objects
              path_prefix: "index_ev_pipe/year=%{yyyy}/month=%{MM}/day=%{dd}/hour=%{HH}"
            threshold:
              event_collect_timeout: 60s
            codec:
              parquet:
                auto_schema: true

Real-time analytics

After the data is available in OpenSearch Service, you can build real-time monitoring and notifications. OpenSearch Service has robust support for multiple notification channels, allowing you to receive alerts through services like Slack, Chime, custom webhooks, Microsoft Teams, email, and Amazon Simple Notification Service (Amazon SNS).

The following screenshot illustrates supported notification channels in OpenSearch Service.

The notification feature in OpenSearch Service allows you to create monitors that will watch for certain conditions or changes in your data and launch alerts, such as monitoring vehicle telemetry data and launching alerts for issues like battery degradation or abnormal energy consumption. For example, you can create a monitor that analyzes battery capacity over time and notifies the on-call team using Slack if capacity drops below expected degradation curves in a significant number of vehicles. This could indicate a potential manufacturing defect requiring investigation.

In addition to notifications, OpenSearch Service makes it easy to build real-time dashboards to visually track metrics across your fleet of vehicles. You can ingest vehicle telemetry data like location, speed, fuel consumption, and so on, and visualize it on maps, charts, and gauges. Dashboards can provide real-time visibility into vehicle health and performance.

The following screenshot illustrates creating a sample dashboard on OpenSearch Service

Opensearch Dashboard

A key benefit of OpenSearch Service is its ability to handle high sustained ingestion and query rates with millisecond latencies. It distributes incoming vehicle data across data nodes in a cluster for parallel processing. This allows OpenSearch to scale out to handle very large fleets while still delivering the real-time performance needed for operational visibility and alerting.

Batch analytics

After the data is available in Amazon S3, you can build a secure data lake to power a variety of analytics use cases deriving powerful insights. As an immutable store, new data is continually stored in S3 while existing data remains unaltered. This serves as a single source of truth for downstream analytics.

For business intelligence and reporting, you can analyze trends, identify insights, and create rich visualizations powered by the data lake. You can use Amazon QuickSight to build and share dashboards without needing to set up servers or infrastructure. Here’s an example of a Quicksight dashboard for IoT device data. For example, you can use a dashboard to gain insights from historical data that can help with better vehicle and battery design.

The Amazon Quicksight public gallery shows examples of dashboards across different domains.

You should consider Amazon OpenSearch dashboards for your operational day-to-day use cases to identify issues and alert in near real time whereas Amazon Quicksight should be used to analyze big data stored in a lake house and generate actionable insights from them.

Clean up

Delete the OpenSearch pipeline and Amazon MSK cluster to stop incurring costs on these services.

Conclusion

In this post, you learned how Amazon MSK, OpenSearch Ingestion, OpenSearch Services, and Amazon S3 can be integrated to ingest, process, store, analyze, and act on endless streams of EV data efficiently.

With OpenSearch Ingestion as the integration layer between streams and storage, the entire pipeline scales up and down automatically based on demand. No more complex cluster management or lost data from bursts in streams.

See Amazon OpenSearch Ingestion to learn more.


About the authors

Ayush Agrawal is a Startups Solutions Architect from Gurugram, India with 11 years of experience in Cloud Computing. With a keen interest in AI, ML, and Cloud Security, Ayush is dedicated to helping startups navigate and solve complex architectural challenges. His passion for technology drives him to constantly explore new tools and innovations. When he’s not architecting solutions, you’ll find Ayush diving into the latest tech trends, always eager to push the boundaries of what’s possible.

Fraser SequeiraFraser Sequeira is a Solutions Architect with AWS based in Mumbai, India. In his role at AWS, Fraser works closely with startups to design and build cloud-native solutions on AWS, with a focus on analytics and streaming workloads. With over 10 years of experience in cloud computing, Fraser has deep expertise in big data, real-time analytics, and building event-driven architecture on AWS.

How PostNL processes billions of IoT events with Amazon Managed Service for Apache Flink

Post Syndicated from Çağrı Çakır original https://aws.amazon.com/blogs/big-data/how-postnl-processes-billions-of-iot-events-with-amazon-managed-service-for-apache-flink/

This post is co-written with Çağrı Çakır and Özge Kavalcı from PostNL.

PostNL is the designated universal postal service provider for the Netherlands and has three main business units offering postal delivery, parcel delivery, and logistics solutions for ecommerce and cross-border solutions. With 5,800 retail points, 11,000 mailboxes, and over 900 automated parcel lockers, the company plays an important role in the logistics value chain. It aims to be the delivery organization of choice by making it as easy as possible to send and receive parcels and mail. With almost 34,000 employees, PostNL is at the heart of society. On a typical weekday, the company delivers an average of 1.1 million parcels and 6.9 million letters across Belgium, Netherlands, and Luxemburg.

In this post, we describe the legacy PostNL stream processing solution, its challenges, and why PostNL chose Amazon Managed Service for Apache Flink to help modernize their Internet of Things (IoT) data stream processing platform. We provide a reference architecture, describe the steps we took to migrate to Apache Flink, and the lessons learned along the way.

With this migration, PostNL has been able to build a scalable, robust, and extendable stream processing solution for their IoT platform. Apache Flink is a perfect fit for IoT. Scaling horizontally, it allows processing the sheer volume of data generated by IoT devices. With event time semantics, you can correctly handle events in the order they were generated, even from occasionally disconnected devices.

PostNL is excited about the potential of Apache Flink, and now plans to use Managed Service for Apache Flink with other streaming use cases and shift more business logic upstream into Apache Flink.

Apache Flink and Managed Service for Apache Flink

Apache Flink is a distributed computation framework that allows for stateful real-time data processing. It provides a single set of APIs for building batch and streaming jobs, making it straightforward for developers to work with bounded and unbounded data. Managed Service for Apache Flink is an AWS service that provides a serverless, fully managed infrastructure for running Apache Flink applications. Developers can build highly available, fault-tolerant, and scalable Apache Flink applications with ease and without needing to become an expert in building, configuring, and maintaining Apache Flink clusters on AWS.

The challenge of real-time IoT data at scale

Today, PostNL’s IoT platform, Roller Cages solution, tracks more than 380,000 assets with Bluetooth Low Energy (BLE) technology in near real time. The IoT platform was designed to provide availability, geofencing, and bottom state events of each asset by using telemetry sensor data such as GPS points and accelerometers that are coming from Bluetooth devices. Those events are used by different internal consumers to make logistical operations straightforward to plan, more efficient, and sustainable.

PostNL Roller cages tracking solution

Tracking this high volume of assets emitting different sensor readings inevitably creates billions of raw IoT events for the IoT platform as well as for the downstream systems. Handling this load repeatedly both within the IoT platform and throughout the downstream systems was neither cost-efficient nor easy to maintain. To reduce the cardinality of events, the IoT platform uses stream processing to aggregate data over fixed time windows. These aggregations must be based on the moment when the device emitted the event. This type of aggregation based on event time becomes complex when messages may be delayed and arrive out of order, which may frequently happen with IoT devices that can get disconnected temporarily.

The following diagram illustrates the overall flow from edge to the downstream systems.

PostNL IoT workflow

The workflow consists of the following components:

  1. The edge architecture includes IoT BLE devices that serve as sources of telemetry data, and gateway devices that connect these IoT devices to the IoT platform.
  2. Inlets contain a set of AWS services such as AWS IoT Core and Amazon API Gateway to collect IoT detections using MQTTS or HTTPS and deliver them to the source data stream using Amazon Kinesis Data Streams.
  3. The aggregation application filters IoT detections, aggregates them for a fixed time window, and sinks aggregations to the destination data stream.
  4. Event producers are the combination of different stateful services that generate IoT events such as geofencing, availability, bottom state, and in-transit.
  5. Outlets, including services such as Amazon EventBridge, Amazon Data Firehose, and Kinesis Data Streams, deliver produced events to consumers.
  6. Consumers, which are internal teams, interpret IoT events and build business logic based on them.

The core component of this architecture is the aggregation application. This component was originally implemented using a legacy stream processing technology. For several reasons, as we discuss shortly, PostNL decided to evolve this critical component. The journey of replacing the legacy stream processing with Managed Service for Apache Flink is the focus of the rest of this post.

The decision to migrate the aggregation application to Managed Service for Apache Flink

As the number of connected devices grows, so does the necessity for a robust and scalable platform capable of handling and aggregating massive volumes of IoT data. After thorough analysis, PostNL opted to migrate to Managed Service for Apache Flink, driven by several strategic considerations that align with evolving business needs:

  • Enhanced data aggregation – Using Apache Flink’s strong capabilities in real-time data processing enables PostNL to efficiently aggregate raw IoT data from various sources. The ability to extend the aggregation logic beyond what was provided by the current solution can unlock more sophisticated analytics and more informed decision-making processes.
  • Scalability – The managed service provides the ability to scale your application horizontally. This allows PostNL to handle increasing data volumes effortlessly as the number of IoT devices grows. This scalability means that data processing capabilities can expand in tandem with the business.
  • Focus on core business – By adopting a managed service, the IoT platform team can focus on implementing business logic and develop new use cases. The learning curve and overhead of operating Apache Flink at scale would have diverted valuable energies and resources of the relatively small team, slowing down the adoption process.
  • Cost-effectiveness – Managed Service for Apache Flink employs a pay-as-you-go model that aligns with operational budgets. This flexibility is particularly beneficial for managing costs in line with fluctuating data processing needs.

Challenges of handling late events

Common stream processing use cases require aggregating events based on when they were generated. This is called event time semantics. When implementing this type of logic, you may encounter the problem of delayed events, in which events reach your processing system late, long after other events generated around the same time.

Late events are common in IoT due to reasons inherent to the environment, such as network delays, device failures, temporarily disconnected devices, or downtime. IoT devices often communicate over wireless networks, which can introduce delays in transmitting data packets. And sometimes they may experience intermittent connectivity issues, resulting in data being buffered and sent in batches after connectivity is restored. This may result in events being processed out of order—some events may be processed several minutes after other events that were generated around the same time.

Imagine you want to aggregate events generated by devices within a specific 10-second window. If events can be several minutes late, how can you be sure you have received all events that were generated in those 10 seconds?

A simple implementation may just wait for several minutes, allowing late events to arrive. But this method means that you can’t calculate the result of your aggregation until several minutes later, increasing the output latency. Another solution would be waiting a few seconds, and then dropping any events arriving later.

Increasing latency or dropping events that may contain critical information are not palatable options for the business. The solution must be a good compromise, a trade-off between latency and completeness.

Apache Flink offers event time semantics out of the box. In contrast to other stream processing frameworks, Flink offers multiple options for dealing with late events. We dive into how Apache Flink deal with late events next.

A powerful stream processing API

Apache Flink provides a rich set of operators and libraries for common data processing tasks, including windowing, joins, filters, and transformations. It also includes over 40 connectors for various data sources and sinks, including streaming systems like Apache Kafka and Amazon Managed Streaming for Apache Kafka, or Kinesis Data Streams, databases, and also file system and object stores like Amazon Simple Storage Service (Amazon S3).

But the most important characteristic for PostNL is that Apache Flink offers different APIs with different level of abstractions. You can start with a higher level of abstraction, SQL, or Table API. These APIs abstract streaming data as more familiar tables, making them easier to learn for simpler use cases. If your logic becomes more complex, you can switch to the lower level of abstraction of the DataStream API, where streams are represented natively, closer to the processing happening inside Apache Flink. If you need the finest-grained level of control on how each single event is handled, you can switch to the Process Function.

A key learning has been that choosing one level of abstraction for your application is not an irreversible architectural decision. In the same application, you can mix different APIs, depending on the level of control you need at that specific step.

Scaling horizontally

To process billions of raw events and grow with the business, the ability to scale was an essential requirement for PostNL. Apache Flink is designed to scale horizontally, distributing processing and application state across multiple processing nodes, with the ability to scale out further when the workload grows.

For this particular use case, PostNL had to aggregate the sheer volume of raw events with similar characteristics and over time, to reduce their cardinality and make the data flow manageable for the other systems downstream. These aggregations go beyond simple transformations that handle one event at a time. They require a framework capable of stateful stream processing. This is exactly the type of use case Apache Flink was designed for.

Advanced event time semantics

Apache Flink emphasizes event time processing, which enables accurate and consistent handling of data with respect to the time it occurred. By providing built-in support for event time semantics, Flink can handle out-of-order events and late data gracefully. This capability was fundamental for PostNL. As mentioned, IoT generated events may arrive late and out of order. However, the aggregation logic must be based on the moment the measurement was actually taken by the device—the event time—and not when it’s processed.

Resiliency and guarantees

PostNL had to make sure no data sent from the device is lost, even in case of failure or restart of the application. Apache Flink offers strong fault tolerance guarantees through its distributed snapshot-based checkpointing mechanism. In the event of failures, Flink can recover the state of the computations and achieve exactly-once semantics of the result. For example, each event from a device is never missed nor counted twice, even in the event of an application failure.

The journey of choosing the right Apache Flink API

A key requirement of the migration was reproducing exactly the behavior of the legacy aggregation application, as expected by the downstream systems that can’t be modified. This introduced several additional challenges, in particular around windowing semantics and late event handling.

As we have seen, in IoT, events may be out of order by several minutes. Apache Flink offers two high-level concepts for implementing event time semantics with out-of-order events: watermarks and allowed lateness.

Apache Flink provides a range of flexible APIs with different levels of abstraction. After some initial research, Flink-SQL and the Table API were discarded. These higher levels of abstraction provide advanced windowing and event time semantics, but couldn’t provide the fine-grained control PostNL needed to reproduce exactly the behavior of the legacy application.

The lower level of abstraction of the DataStream API also offers windowing aggregation capabilities, and allows you to customize the behaviors with custom triggers, evictors, and handling late events by setting an allowed lateness.

Unfortunately, the legacy application was designed to handle late events in a peculiar way. The result was a hybrid event time and processing time logic that couldn’t be easily reproduced using high-level Apache Flink primitives.

Fortunately, Apache Flink offers a further lower level of abstraction, the ProcessFunction API. With this API, you have the finest-grained control on application state, and you can use timers to implement virtually any custom time-based logic.

PostNL decided to go in this direction. The aggregation was implemented using a KeyedProcessFunction that provides a way to perform arbitrary stateful processing on keyed streams—logically partitioned streams. Raw events from each IoT device are aggregated based on their event time (the timestamp written on the event by the source device) and the results of each window is emitted based on processing time (the current system time).

This fine-grained control finally allowed PostNL to reproduce exactly the behavior expected by the downstream applications.

The journey to production readiness

Let’s explore the journey of migrating to Managed Service for Apache Flink, from the start of the project to the rollout to production.

Identifying requirements

The first step of the migration process focused on thoroughly understanding the existing system’s architecture and performance metrics. The goal was to provide a seamless transition to Managed Service for Apache Flink with minimal disruption to ongoing operations.

Understanding Apache Flink

PostNL needed to familiarize themselves with the Managed Service for Apache Flink application and its streaming processing capabilities, including built-in windowing strategies, aggregation functions, event time vs. processing time differences, and finally KeyProcessFunction and mechanisms for handling late events.

Different options were considered, using primitives provided by Apache Flink out of the box, for event time logic and late events. The biggest requirement was to reproduce exactly the behavior of the legacy application. The ability to switch to using a lower level of abstraction helped. Using the finest-grained control allowed by the ProcessFunction API, PostNL was able to handle late events exactly as the legacy application.

Designing and implementing ProcessFunction

The business logic is designed using ProcessFunction to emulate the peculiar behavior of the legacy application in handling late events without excessively delaying the initial results. PostNL decided to use Java for the implementation, because Java is the primary language for Apache Flink. Apache Flink allows you to develop and test your application locally, in your preferred integrated development environment (IDE), using all the available debug tools, before deploying it to Managed Service for Apache Flink. Java 11 with Maven compiler was used for implementation. For more information about IDE requirements, refer to Getting started with Amazon Managed Service for Apache Flink (DataStream API).

Testing and validation

The following diagram shows the architecture used to validate the new application.

Testing architecture

To validate the behavior of the ProcessFunction and late event handling mechanisms, integration tests were designed to run both the legacy application and the Managed Service for Flink application in parallel (Steps 3 and 4). This parallel execution allowed PostNL to directly compare the results generated by each application under identical conditions. Multiple integration test cases push data to the source stream (2) in parallel (7) and wait until their aggregation window is complete, then they pull the aggregated results from the destination stream to compare (8). Integration tests are automatically triggered by the CI/CD pipeline after deployment of the infrastructure is complete. During the integration tests, the primary focus was on achieving data consistency and processing accuracy between the legacy application and the Managed Service for Flink application. The output streams, aggregated data, and processing latencies were compared to validate that the migration didn’t introduce any unexpected discrepancies. For writing and running the integration tests, Robot Framework, an open source automation framework, was utilized.

After the integration tests are passed, there is one more validation layer: end-to-end tests. Similar to the integration tests, end-to-end tests are automatically invoked by the CI/CD pipeline after the deployment of the platform infrastructure is complete. This time, multiple end-to-end test cases send data to AWS IoT Core (1) in parallel (9) and check the aggregated results from the destination S3 bucket (5, 6) dumped from the output stream to compare (10).

Deployment

PostNL decided to run the new Flink application on shadow mode. The new application ran for some time in parallel with the legacy application, consuming exactly the same inputs, and sending output from both applications to a data lake on Amazon S3. This allowed them to compare the results of the two applications using real production data, and also to test the stability and performance of the new one.

Performance optimization

During migration, the PostNL IoT platform team learned how the Flink application can be fine-tuned for optimal performance, considering factors such as data volume, processing speed, and efficient late event handling. A particularly interesting aspect was to verify that the state size wasn’t increasing unbounded over the long term. A risk of using the finest-grained control of ProcessFunction is state leak. This happens when your implementation, directly controlling the state in the ProcessFunction, misses some corner cases where a state is never deleted. This causes the state to grow unbounded. Because streaming applications are designed to run continuously, an expanding state can degrade performance and eventually exhaust memory or local disk space.

With this phase of testing, PostNL found the right balance of application parallelism and resources—including compute, memory, and storage—to process the normal daily workload profile without lag, and handle occasional peaks without over-provisioning, optimizing both performance and cost-effectiveness.

Final switch

After running the new application in shadow mode for some time, the team decided the application was stable and emitting the expected output. The PostNL IoT platform finally switched over to production and shut down the legacy application.

Key takeaways

Among the several learnings gathered in the journey of adopting Managed Service for Apache Flink, some are particularly important, and proving key when expanding to new and diverse use cases:

  • Understand event time semantics – A deep understanding of event time semantics is crucial in Apache Flink for accurately implementing time-dependent data operations. This knowledge makes sure events are processed correctly relative to when they actually occurred.
  • Use the powerful Apache Flink API – Apache Flink’s API allows for the creation of complex, stateful streaming applications beyond basic windowing and aggregations. It’s important to fully grasp the extensive capabilities offered by the API to tackle sophisticated data processing challenges.
  • With power comes more responsibility – The advanced functionality of Apache Flink’s API brings significant responsibility. Developers must make sure applications are efficient, maintainable, and stable, requiring careful resource management and adherence to best practices in coding and system design.
  • Don’t mix event time and processing time logic – Combining event time and processing time for data aggregation presents unique challenges. It prevents you from using higher-level functionalities provided out of the box by Apache Flink. The lowest level of abstractions among Apache Flink APIs allow for implementing custom time-based logic, but require a careful design to achieve accuracy and timely results, alongside extensive testing to validate good performance.

Conclusion

In the journey of adopting Apache Flink, the PostNL team learned how the powerful Apache Flink APIs allow you to implement complex business logic. The team came to appreciate how Apache Flink can be utilized to solve several and diverse problems, and they are now planning to extend it to more stream processing use cases.

With Managed Service for Apache Flink, the team was able to focus on the business value and implementing the required business logic, without worrying about the heavy lifting of setting up and managing an Apache Flink cluster.

To learn more about Managed Service for Apache Flink and choosing the right managed service option and API for your use case, see What is Amazon Managed Service for Apache Flink. To experience hands-on how to develop, deploy, and operate Apache Flink applications on AWS, see the Amazon Managed Service for Apache Flink Workshop.


About the Authors

Çağrı ÇakırÇağrı Çakır is the Lead Software Engineer for the PostNL IoT platform, where he manages the architecture that processes billions of events each day. As an AWS Certified Solutions Architect Professional, he specializes in designing and implementing event-driven architectures and stream processing solutions at scale. He is passionate about harnessing the power of real-time data, and dedicated to optimizing operational efficiency and innovating scalable systems.

Ozge KavalciÖzge Kavalcı works as Senior Solution Engineer for the PostNL IoT platform and loves to build cutting-edge solutions that integrate with the IoT landscape. As an AWS Certified Solutions Architect, she specializes in designing and implementing highly scalable serverless architectures and real-time stream processing solutions that can handle unpredictable workloads. To unlock the full potential of real-time data, she is dedicated to shaping the future of IoT integration.

Amit SinghAmit Singh works as a Senior Solutions Architect at AWS with enterprise customers on the value proposition of AWS, and participates in deep architectural discussions to make sure solutions are designed for successful deployment in the cloud. This includes building deep relationships with senior technical individuals to enable them to be cloud advocates. In his free time, he likes to spend time with his family and learn more about everything cloud.

Lorenzo NicoraLorenzo Nicora works as Senior Streaming Solutions Architect at AWS helping customers across EMEA. He has been building cloud-centered, data-intensive systems for several years, working in the finance industry both through consultancies and for fintech product companies. He has used open-source technologies extensively and contributed to several projects, including Apache Flink.

The UK Bans Default Passwords

Post Syndicated from Bruce Schneier original https://www.schneier.com/blog/archives/2024/05/the-uk-bans-default-passwords.html

The UK is the first country to ban default passwords on IoT devices.

On Monday, the United Kingdom became the first country in the world to ban default guessable usernames and passwords from these IoT devices. Unique passwords installed by default are still permitted.

The Product Security and Telecommunications Infrastructure Act 2022 (PSTI) introduces new minimum-security standards for manufacturers, and demands that these companies are open with consumers about how long their products will receive security updates for.

The UK may be the first country, but as far as I know, California is the first jurisdiction. It banned default passwords in 2018, the law taking effect in 2020.

This sort of thing benefits all of us everywhere. IoT manufacturers aren’t making two devices, one for California and one for the rest of the US. And they’re not going to make one for the UK and another for the rest of Europe, either. They’ll remove the default passwords and sell those devices everywhere.

Another news article.

Using Amazon Verified Permissions to manage authorization for AWS IoT smart home applications

Post Syndicated from Rajat Mathur original https://aws.amazon.com/blogs/security/using-amazon-verified-permissions-to-manage-authorization-for-aws-iot-smart-thermostat-applications/

This blog post introduces how manufacturers and smart appliance consumers can use Amazon Verified Permissions to centrally manage permissions and fine-grained authorizations. Developers can offer more intuitive, user-friendly experiences by designing interfaces that align with user personas and multi-tenancy authorization strategies, which can lead to higher user satisfaction and adoption. Traditionally, implementing authorization logic using role based access control (RBAC) or attribute based access control (ABAC) within IoT applications can become complex as the number of connected devices and associated user roles grows. This often leads to an unmanageable increase in access rules that must be hard-coded into each application, requiring excessive compute power for evaluation. By using Verified Permissions, you can externalize the authorization logic using Cedar policy language, enabling you to define fine-grained permissions that combine RBAC and ABAC models. This decouples permissions from your application’s business logic, providing a centralized and scalable way to manage authorization while reducing development effort.

In this post, we walk you through a reference architecture that outlines an end-to-end smart thermostat application solution using AWS IoT Core, Verified Permissions, and other AWS services. We show you how to use Verified Permissions to build an authorization solution using Cedar policy language to define dynamic policy-based access controls for different user personas. The post includes a link to a GitHub repository that houses the code for the web dashboard and the Verified Permissions logic to control access to the solution APIs.

Solution overview

This solution consists of a smart thermostat IoT device and an AWS hosted web application using Verified Permissions for fine-grained access to various application APIs. For this use case, the AWS IoT Core device is being simulated by an AWS Cloud9 environment and communicates with the IoT service using AWS IoT Device SDK for Python. After being configured, the device connects to AWS IoT Core to receive commands and send messages to various MQTT topics.

As a general practice, when a user-facing IoT solution is implemented, the manufacturer performs administrative tasks such as:

  1. Embedding AWS Private Certificate Authority certificates into each IoT device (in this case a smart thermostat). Usually this is done on the assembly line and the certificates used to verify the IoT endpoints are burned into device memory along with the firmware.
  2. Creating an Amazon Cognito user pool that provides sign-up and sign-in options for web and mobile application users and hosts the authentication process.
  3. Creating policy stores and policy templates in Verified Permissions. Based on who signs up, the manufacturer creates policies with Verified Permissions to link each signed-up user to certain allowed resources or IoT devices.
  4. The mapping of user to device is stored in a datastore. For this solution, you’ll use an Amazon DynamoDB table to record the relationship.

The user who purchases the device (the primary device owner) performs the following tasks:

  1. Signs up on the manufacturer’s web application or mobile app and registers the IoT device by entering a unique serial number. The mapping between user details and the device serial number is stored in the datastore through an automated process that is initiated after sign-up and device claim.
  2. Connects the new device to an existing wireless network, which initiates a registration process to securely connect to AWS IoT Core services within the manufacturer’s account.
  3. Invites other users (such as guests, family members, or the power company) through a referral, invitation link, or a designated OAuth process.
  4. Assign roles to the other users and therefore permissions.
     
Figure 1: Sample smart home application architecture built using AWS services

Figure 1: Sample smart home application architecture built using AWS services

Figure 1 depicts the solution as three logical components:

  1. The first component depicts device operations through AWS IoT Core. The smart thermostat is on site and it communicates with AWS IoT Core and its state is managed through the AWS IoT Device Shadow Service.
  2. The second component depicts the web application, which is the application interface that customers use. It’s a ReactJS-backed single page application deployed using AWS Amplify.
  3. The third component shows the backend application, which is built using Amazon API Gateway, AWS Lambda, and DynamoDB. A Cognito user pool is used to manage application users and their authentication. Authorization is handled by Verified Permissions where you create and manage policies that are evaluated when the web application calls backend APIs. These policies are evaluated against each authorization policy to provide an access decision to deny or allow an action.

The solution flow itself can be broken down into three steps after the device is onboarded and users have signed up:

  1. The smart thermostat device connects and communicates with AWS IoT Core using the MQTT protocol. A classic Device Shadow is created for the AWS IoT thing Thermostat1 when the UpdateThingShadow call is made the first time through the AWS SDK for a new device. AWS IoT Device Shadow service lets the web application query and update the device’s state in case of connectivity issues.
  2. Users sign up or sign in to the Amplify hosted smart home application and authenticate themselves against a Cognito user pool. They’re mapped to a device, which is stored in a DynamoDB table.
  3. After the users sign in, they’re allowed to perform certain tasks and view certain sections of the dashboard based on the different roles and policies managed by Verified Permissions. The underlying Lambda function that’s responsible for handling the API calls queries the DynamoDB table to provide user context to Verified Permissions.

Prerequisites

  1. To deploy this solution, you need access to the AWS Management Console and AWS Command Line Interface (AWS CLI) on your local machine with sufficient permissions to access required services, including Amplify, Verified Permissions, and AWS IoT Core. For this solution, you’ll give the services full access to interact with different underlying services. But in production, we recommend following security best practices with AWS Identity and Access Management (IAM), which involves scoping down policies.
  2. Set up Amplify CLI by following these instructions. We recommend the latest NodeJS stable long-term support (LTS) version. At the time of publishing this post, the LTS version was v20.11.1. Users can manage multiple NodeJS versions on their machines by using a tool such as Node Version Manager (nvm).

Walkthrough

The following table describes the actions, resources, and authorization decisions that will be enforced through Verified Permissions policies to achieve fine-grained access control. In this example, John is the primary device owner and has purchased and provisioned a new smart thermostat device called Thermostat1. He has invited Jane to access his device and has given her restricted permissions. John has full control over the device whereas Jane is only allowed to read the temperature and set the temperature between 72°F and 78°F.

John has also decided to give his local energy provider (Power Company) access to the device so that they can set the optimum temperature during the day to manage grid load and offer him maximum savings on his energy bill. However, they can only do so between 2:00 PM and 5:00 PM.

For security purposes the verified permissions default decision is DENY for unauthorized principals.

Name Principal Action Resource Authorization decision
Any Default Default Default Deny
John john_doe Any Thermostat1 Allow
Jane jane_doe GetTemperature Thermostat1 Allow
Jane jane_doe SetTemperature Thermostat1 Allow only if desired temperature is between 72°F and 78°F.
Power Company powercompany GetTemperature Thermostat1 Allow only if accessed between the hours of 2:00 PM and 5:00 PM
Power Company powercompany SetTemperature Thermostat1 Allow only if the temperature is set between the hours of 2:00 PM and 5:00 PM

Create a Verified Permissions policy store

Verified Permissions is a scalable permissions management and fine-grained authorization service for the applications that you build. The policies are created using Cedar, a dedicated language for defining access permissions in applications. Cedar seamlessly integrates with popular authorization models such as RBAC and ABAC.

A policy is a statement that either permits or forbids a principal to take one or more actions on a resource. A policy store is a logical container that stores your Cedar policies, schema, and principal sources. A schema helps you to validate your policy and identify errors based on the definitions you specify. See Cedar schema to learn about the structure and formal grammar of a Cedar schema.

To create the policy store

  1. Sign in to the Amazon Verified Permissions console and choose Create policy store.
  2. In the Configuration Method section, select Empty Policy Store and choose Create policy store.
     
Figure 2: Create an empty policy store

Figure 2: Create an empty policy store

Note: Make a note of the policy store ID to use when you deploy the solution.

To create a schema for the application

  1. On the Verified Permissions page, select Schema.
  2. In the Schema section, choose Create schema.
     
    Figure 3: Create a schema

    Figure 3: Create a schema

  3. In the Edit schema section, choose JSON mode, paste the following sample schema for your application, and choose Save changes.
    {
        "AwsIotAvpWebApp": {
            "entityTypes": {
                "Device": {
                    "shape": {
                        "attributes": {
                            "primaryOwner": {
                                "name": "User",
                                "required": true,
                                "type": "Entity"
                            }
                        },
                        "type": "Record"
                    },
                    "memberOfTypes": []
                },
                "User": {}
            },
            "actions": {
                "GetTemperature": {
                    "appliesTo": {
                        "context": {
                            "attributes": {
                                "desiredTemperature": {
                                    "type": "Long"
                                },
                                "time": {
                                    "type": "Long"
                                }
                            },
                            "type": "Record"
                        },
                        "resourceTypes": [
                            "Device"
                        ],
                        "principalTypes": [
                            "User"
                        ]
                    }
                },
                "SetTemperature": {
                    "appliesTo": {
                        "resourceTypes": [
                            "Device"
                        ],
                        "principalTypes": [
                            "User"
                        ],
                        "context": {
                            "attributes": {
                                "desiredTemperature": {
                                    "type": "Long"
                                },
                                "time": {
                                    "type": "Long"
                                }
                            },
                            "type": "Record"
                        }
                    }
                }
            }
        }
    }

When creating policies in Cedar, you can define authorization rules using a static policy or a template-linked policy.

Static policies

In scenarios where a policy explicitly defines both the principal and the resource, the policy is categorized as a static policy. These policies are immediately applicable for authorization decisions, as they are fully defined and ready for implementation.

Template-linked policies

On the other hand, there are situations where a single set of authorization rules needs to be applied across a variety of principals and resources. Consider an IoT application where actions such as SetTemperature and GetTemperature must be permitted for specific devices. Using static policies for each unique combination of principal and resource can lead to an excessive number of almost identical policies, differing only in their principal and resource components. This redundancy can be efficiently addressed with policy templates. Policy templates allow for the creation of policies using placeholders for the principal, the resource, or both. After a policy template is established, individual policies can be generated by referencing this template and specifying the desired principal and resource. These template-linked policies function the same as static policies, offering a streamlined and scalable solution for policy management.

To create a policy that allows access to the primary owner of the device using a static policy

  1. In the Verified Permissions console, on the left pane, select Policies, then choose Create policy and select Create static policy from the drop-down menu.
     
    Figure 4: Create static policy

    Figure 4: Create static policy

  2. Define the policy scope:
    1. Select Permit for the Policy effect.
       
      Figure 5: Define policy effect

      Figure 5: Define policy effect

    2. Select All Principals for Principals scope.
    3. Select All Resources for Resource scope.
    4. Select All Actions for Actions scope and choose Next.
       
      Figure 6: Define policy scope

      Figure 6: Define policy scope

  3. On the Details page, under Policy, paste the following full-access policy, which grants the primary owner permission to perform both SetTemperature and GetTemperature actions on the smart thermostat unconditionally. Choose Create policy.
    	permit (principal, action, resource)
    	when { resource.primaryOwner == principal };
    Figure 7: Write and review policy statement

    Figure 7: Write and review policy statement

To create a static policy to allow a guest user to read the temperature

In this example, the guest user is Jane (username: jane_doe).

  1. Create another static policy and specify the policy scope.
    1. Select Permit for the Policy effect.
       
      Figure 8: Define the policy effect

      Figure 8: Define the policy effect

    2. Select Specific principal for the Principals scope.
    3. Select AwsIotAvpWebApp::User and enter jane_doe.
       
      Figure 9: Define the policy scope

      Figure 9: Define the policy scope

    4. Select Specific resource for the Resources scope.
    5. Select AwsIotAvpWebApp::Device and enter Thermostat1.
    6. Select Specific set of actions for the Actions scope.
    7. Select GetTemperature and choose Next.
       
      Figure 10: Define resource and action scopes

      Figure 10: Define resource and action scopes

    8. Enter the Policy description: Allow jane_doe to read thermostat1.
    9. Choose Create policy.

Next, you will create reusable policy templates to manage policies efficiently. To create a policy template for a guest user with restricted temperature settings that limit the temperature range they can set to between 72°F and 78°F. In this case, the guest user is going to be Jane (username: jane_doe)

To create a reusable policy template

  1. Select Policy template and enter Guest user template as the description.
  2. Paste the following sample policy in the Policy body and choose Create policy template.
    permit (
        principal == ?principal,
        action in [AwsIotAvpWebApp::Action::"SetTemperature"],
        resource == ?resource
    )
    when { context.desiredTemperature >= 72 && context.desiredTemperature <= 78 };
Figure 11: Create guest user policy template

Figure 11: Create guest user policy template

As you can see, you don’t specify the principal and resource yet. You enter those when you create an actual policy from the policy template. The context object will be populated with the desiredTemperature property in the application and used to evaluate the decision.

You also need to create a policy template for the Power Company user with restricted time settings. Cedar policies don’t support date/time format, so you must represent 2:00 PM and 5:00 PM as elapsed minutes from midnight.

To create a policy template for the power company

  1. Select Policy template and enter Power company user template as the description.
  2. Paste the following sample policy in the Policy body and choose Create policy template.
    permit (
        principal == ?principal,
        action in [AwsIotAvpWebApp::Action::"SetTemperature", AwsIotAvpWebApp::Action::"GetTemperature"],
        resource == ?resource
    )
    when { context.time >= 840 && context.time < 1020 };

The policy templates accept the user and resource. The next step is to create a template-linked policy for Jane to set and get thermostat readings based on the Guest user template that you created earlier. For simplicity, you will manually create this policy using the Verified Permissions console. In production, application policies can be dynamically created using the Verified Permissions API.

To create a template-linked policy for a guest user

  1. In the Verified Permissions console, on the left pane, select Policies, then choose Create policy and select Create template-linked policy from the drop-down menu.
     
    Figure 12: Create new template-linked policy

    Figure 12: Create new template-linked policy

  2. Select the Guest user template and choose next.
     
    Figure 13: Select Guest user template

    Figure 13: Select Guest user template

  3. Under parameter selection:
    1. For Principal enter AwsIotAvpWebApp::User::”jane_doe”.
    2. For Resource enter AwsIotAvpWebApp::Device::”Thermostat1″.
    3. Choose Create template-linked policy.
       
      Figure 14: Create guest user template-linked policy

      Figure 14: Create guest user template-linked policy

Note that with this policy in place, jane_doe can only set the temperature of the device Thermostat1 to between 72°F and 78°F.

To create a template-linked policy for the power company user

Based on the template that was set up for power company, you now need an actual policy for it.

  1. In the Verified Permissions console, go to the left pane and select Policies, then choose Create policy and select Create template-linked policy from the drop-down menu.
  2. Select the Power company user template and choose next.
  3. Under Parameter selection, for Principal enter AwsIotAvpWebApp::User::”powercompany”, and for Resource enter AwsIotAvpWebApp::Device::”Thermostat1″, and choose Create template-linked policy.

Now that you have a set of policies in a policy store, you need to update the backend codebase to include this information and then deploy the web application using Amplify.

The policy statements in this post intentionally use human-readable values such as jane_doe and powercompany for the principal entity. This is useful when discussing general concepts but in production systems, customers should use unique and immutable values for entities. See Get the best out of Amazon Verified Permissions by using fine-grained authorization methods for more information.

Deploy the solution code from GitHub

Go to the GitHub repository to set up the Amplify web application. The repository Readme file provides detailed instructions on how to set up the web application. You will need your Verified Permissions policy store ID to deploy the application. For convenience, we’ve provided an onboarding script—deploy.sh—which you can use to deploy the application.

To deploy the application

  1. Close the repository.
    git clone https://github.com/aws-samples/amazon-verified-permissions-iot-
    amplify-smart-home-application.git

  2. Deploy the application.
    ./deploy.sh <region> <Verified Permissions Policy Store ID>

After the web dashboard has been deployed, you’ll create an IoT device using AWS IoT Core.

Create an IoT device and connect it to AWS IoT Core

With the users, policies, and templates, and the Amplify smart home application in place, you can now create a device and connect it to AWS IoT Core to complete the solution.

To create Thermostat1” device and connect it to AWS IoT Core

  1. From the left pane in the AWS IoT console, select Connect one device.
     
    Figure 15: Connect device using AWS IoT console

    Figure 15: Connect device using AWS IoT console

  2. Review how IoT Thing works and then choose Next.
     
    Figure 16: Review how IoT Thing works before proceeding

    Figure 16: Review how IoT Thing works before proceeding

  3. Choose Create a new thing and enter Thermostat1 as the Thing name and choose next.
    &bsp;
    Figure 17: Create the new IoT thing

    Figure 17: Create the new IoT thing

  4. Select Linux/macOS as the Device platform operating system and Python as the AWS IoT Core Device SDK and choose next.
     
    Figure 18: Choose the platform and SDK for the device

    Figure 18: Choose the platform and SDK for the device

  5. Choose Download connection kit and choose next.
     
    Figure 19: Download the connection kit to use for creating the Thermostat1 device

    Figure 19: Download the connection kit to use for creating the Thermostat1 device

  6. Review the three steps to display messages from your IoT device. You will use them to verify the thermostat1 IoT device connectivity to the AWS IoT Core platform. They are:
    1. Step 1: Add execution permissions
    2. Step 2: Run the start script
    3. Step 3: Return to the AWS IoT Console to view the device’s message
       
      Figure 20: How to display messages from an IoT device

      Figure 20: How to display messages from an IoT device

Solution validation

With all of the pieces in place, you can now test the solution.

Primary owner signs in to the web application to set Thermostat1 temperature to 82°F

Figure 21: Thermostat1 temperature update by John

Figure 21: Thermostat1 temperature update by John

  1. Sign in to the Amplify web application as John. You should be able to view the Thermostat1 controller on the dashboard.
  2. Set the temperature to 82°F.
  3. The Lambda function processes the request and performs an API call to Verified Permissions to determine whether to ALLOW or DENY the action based on the policies. Verified Permissions sends back an ALLOW, as the policy that was previously set up allows unrestricted access for primary owners.
  4. Upon receiving the response from Verified Permissions, the Lambda function sends ALLOW permission back to the web application and an API call to the AWS IoT Device Shadow service to update the device (Thermostat1) temperature to 82°F.
     
Figure 22: Policy evaluation decision is ALLOW when a primary owner calls SetTemperature

Figure 22: Policy evaluation decision is ALLOW when a primary owner calls SetTemperature

Guest user signs in to the web application to set Thermostat1 temperature to 80°F

Figure 23: Thermostat1 temperature update by Jane

Figure 23: Thermostat1 temperature update by Jane

  1. If you sign in as Jane to the Amplify web application, you can view the Thermostat1 controller on the dashboard.
  2. Set the temperature to 80°F.
  3. The Lambda function validates the actions by sending an API call to Verified Permissions to determine whether to ALLOW or DENY the action based on the established policies. Verified Permissions sends back a DENY, as the policy only permits temperature adjustments between 72°F and 78°F.
  4. Upon receiving the response from Verified Permissions, the Lambda function sends DENY permissions back to the web application and an unauthorized response is returned.
     
    Figure 24: Guest user jane_doe receives a DENY when calling SetTemperature for a desired temperature of 80°F

    Figure 24: Guest user jane_doe receives a DENY when calling SetTemperature for a desired temperature of 80°F

  5. If you repeat the process (still as Jane) but set Thermostat1 to 75°F, the policy will cause the request to be allowed.
     
    Figure 25: Guest user jane_doe receives an ALLOW when calling SetTemperature for a desired temperature of 75°F

    Figure 25: Guest user jane_doe receives an ALLOW when calling SetTemperature for a desired temperature of 75°F

  6. Similarly, jane_doe is allowed run GetTemperature on the device Thermostat1. When the temperature is set to 74°F, the device shadow is updated. The IoT device being simulated by your AWS Cloud9 instance reads desired the temperature field and sets the reported value to 74.
  7. Now, when jane_doe runs GetTemperature, the value of the device is reported as 74 as shown in Figure 26. We encourage you to try different restrictions in the World Settings (outside temperature and time) by adding restrictions to the static policy that allows GetTemperature for guest user.
     
    Figure 26: Guest user jane_doe receives an ALLOW when calling GetTemperature for the reported temperature

    Figure 26: Guest user jane_doe receives an ALLOW when calling GetTemperature for the reported temperature

Power company signs in to the web application to set Thermostat1 to 78°F at 3.30 PM

Figure 27: Thermostat1 temperature set to 78°F by powercompany user at a specified time

Figure 27: Thermostat1 temperature set to 78°F by powercompany user at a specified time

  1. Sign in as the powercompany user to the Amplify web application using an API. You can view the Thermostat1 controller on the dashboard.
  2. To test this scenario, set the current time to 3:30 PM, and try to set the temperature to 78°F.
  3. The Lambda function validates the actions by sending an API call to Verified Permissions to determine whether to ALLOW or DENY the action based on pre-established policies. Verified Permissions returns ALLOW permission, because the policy for powercompany permits device temperature changes between 2:00 PM and 5:00 PM.
  4. Upon receiving the response from Verified Permissions, the Lambda function sends ALLOW permission back to the web application and an API call to the AWS IoT Device Shadow service to update the Thermostat1 temperature to 78°F.
     
    Figure 28: powercompany receives an ALLOW when SetTemperature is called with the desired temperature of 78°F

    Figure 28: powercompany receives an ALLOW when SetTemperature is called with the desired temperature of 78°F

Note: As an optional exercise, we also made jane_doe a device owner for device Thermostat2. This can be observed in the users.json file in the Github repository. We encourage you to create your own policies and restrict functions for Thermostat2 after going through this post. You will need to create separate Verified Permissions policies and update the Lambda functions to interact with these policies.

We encourage you to create policies for guests and the power company and restrict permissions based on the following criteria:

  1. Verify Jane Doe can perform GetTemperature and SetTemperature actions on Thermostat2.
  2. John Doe should not be able to set the temperature on device Thermostat2 outside of the time range of 4:00 PM and 6:00 PM and outside of the temperature range of 68°F and 72°F.
  3. Power Company can only perform the GetTemperature operation, but there are no restrictions on time and outside temperature.

To help you verify the solution, we’ve provided the correct policies under the challenge directory in the GitHub repository.

Clean up

Deploying the Thermostat application in your AWS account will incur costs. To avoid ongoing charges, when you’re done examining the solution, delete the resources that were created. This includes the Amplify hosted web application, API Gateway resource, AWS Cloud 9 environment, the Lambda function, DynamoDB table, Cognito user pool, AWS IoT Core resources, and Verified Permissions policy store.

Amplify resources can be deleted by going to the AWS CloudFormation console and deleting the stacks that were used to provision various services.

Conclusion

In this post, you learned about creating and managing fine-grained permissions using Verified Permissions for different user personas for your smart thermostat IoT device. With Verified Permissions, you can strengthen your security posture and build smart applications aligned with Zero Trust principles for real-time authorization decisions. To learn more, we recommend:

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Author

Rajat Mathur

Rajat is a Principal Solutions Architect at Amazon Web Services. Rajat is a passionate technologist who enjoys building innovative solutions for AWS customers. His core areas of focus are IoT, Networking, and Serverless computing. In his spare time, Rajat enjoys long drives, traveling, and spending time with family.

Pronoy Chopra

Pronoy Chopra

Pronoy is a Senior Solutions Architect with the Startups Generative AI team at AWS. He specializes in architecting and developing IoT and Machine Learning solutions. He has co-founded two startups and enjoys being hands-on with projects in the IoT, AI/ML and Serverless domain. His work in Magnetoencephalography has been cited many times in the effort to build better brain-compute interfaces.

Syed Sanoor

Syed Sanoor

Syed serves as a Solutions Architect, assisting customers in the enterprise sector. With a foundation in software engineering, he takes pleasure in crafting solutions tailored to client needs. His expertise predominantly lies in C# and IoT. During his leisure time, Syed enjoys piloting drones and playing cricket.

Serverless IoT email capture, attachment processing, and distribution

Post Syndicated from Stacy Conant original https://aws.amazon.com/blogs/messaging-and-targeting/serverless-iot-email-capture-attachment-processing-and-distribution/

Many customers need to automate email notifications to a broad and diverse set of email recipients, sometimes from a sensor network with a variety of monitoring capabilities. Many sensor monitoring software products include an SMTP client to achieve this goal. However, managing email server infrastructure requires specialty expertise and operating an email server comes with additional cost and inherent risk of breach, spam, and storage management. Organizations also need to manage distribution of attachments, which could be large and potentially contain exploits or viruses. For IoT use cases, diagnostic data relevance quickly expires, necessitating retention policies to regularly delete content.

Solution Overview

This solution uses the Amazon Simple Email Service (SES) SMTP interface to receive SMTP client messages, and processes the message to replace an attachment with a pre-signed URL in the resulting email to its intended recipients. Attachments are stored separately in an Amazon Simple Storage Service (S3) bucket with a lifecycle policy implemented. This reduces the storage requirements of recipient email server receiving notification emails. Additionally, this solution leverages built-in anti-spam and security scanning capabilities to deal with spam and potentially malicious attachments while at the same time providing the mechanism by which pre-signed attachment links can be revoked should the emails be distributed to unintended recipients.

The solution uses:

  • Amazon SES SMTP interface to receive incoming emails.
  • Amazon SES receipt rule on a (sub)domain controlled by administrators, to store raw incoming emails in an Amazon S3 bucket.
  • AWS Lambda function, triggered on S3 ObjectCreated event, to process raw emails, extract attachments, replace each with pre-signed URL with configurable expiry, and send the processed emails to intended recipients.

Solution Flow Details:

  1. SMTP client transmits email content to an email address in a (sub) domain with MX record set to Amazon SES service’s regional endpoint.
  2. Amazon SES SMTP interface receives an email and forwards it to SES Receipt Rule(s) for processing.
  3. A matching Amazon SES Receipt Rule saves incoming email into an Amazon S3 Bucket.
  4. Amazon S3 Bucket emits an S3 ObjectCreated Event, and places the event onto the Amazon Simple Queue Services (SQS) queue.
  5. The AWS Lambda service polls the inbound messages’ SQS queue and feeds events to the Lambda function.
  6. The Lambda function, retrieves email files from the S3 bucket, parses the email sender/subject/body, saves attachments to a separate attachment S3 bucket (7), and replaces attachments with pre-signed URLs in the email body. The Lambda function then extracts intended recipient addresses from the email body. If the body contains properly formatted recipients list, email is then sent using SES API (9), otherwise a notice is posted to a fallback Amazon Simple Notification Service (SNS) Topic (8).
  7. The Lambda function saves extracted attachments, if any, into an attachments bucket.
  8. Malformed email notifications are posted to a fallback Amazon SNS Topic.
  9. The Lambda function invokes Amazon SES API to send the processed email to all intended recipient addresses.
  10. If the Lambda function is unable to process email successfully, the inbound message is placed on to the SQS dead-letter queue (DLQ) queue for later intervention by the operator.
  11. SES delivers an email to each recipients’ mail server.
  12. Intended recipients download emails from their corporate mail servers and retrieve attachments from the S3 pre-signed URL(s) embedded in the email body.
  13. An alarm is triggered and a notification is published to Amazon SNS Alarms Topic whenever:
    • More than 50 failed messages are in the DLQ.
    • Oldest message on incoming SQS queue is older than 3 minutes – unable to keep up with inbound messages (flooding).
    • The incoming SQS queue contains over 180 messages (configurable) over 5 minutes old.

Setting up Amazon SES

For this solution you will need an email account where you can receive emails. You’ll also need a (sub)domain for which you control the mail exchanger (MX) record. You can obtain your (sub)domain either from Amazon Route53 or another domain hosting provider.

Verify the sender email address

You’ll need to follow the instructions to Verify an email address for all identities that you use as “From”, “Source”, ” Sender”, or “Return-Path” addresses. You’ll also need to follow these instructions for any identities you wish to send emails to during initial testing while your SES account is in the “Sandbox” (see next “Moving out of the SES Sandbox” section).

Moving out of the SES Sandbox

Amazon SES accounts are “in the Sandbox” by default, limiting email sending only to verified identities. AWS does this to prevent fraud and abuse as well as protecting your reputation as an email sender. When your account leaves the Sandbox, SES can send email to any recipient, regardless of whether the recipient’s address or domain is verified by SES. However, you still have to verify all identities that you use as “From”, “Source”, “Sender”, or “Return-Path” addresses.
Follow the Moving out of the SES Sandbox instructions in the SES Developer Guide. Approval is usually within 24 hours.

Set up the SES SMTP interface

Follow the workshop lab instructions to set up email sending from your SMTP client using the SES SMTP interface. Once you’ve completed this step, your SMTP client can open authenticated sessions with the SES SMTP interface and send emails. The workshop will guide you through the following steps:

  1. Create SMTP credentials for your SES account.
    • IMPORTANT: Never share SMTP credentials with unauthorized individuals. Anyone with these credentials can send as many SMTP requests and in whatever format/content they choose. This may result in end-users receiving emails with malicious content, administrative/operations overload, and unbounded AWS charges.
  2. Test your connection to ensure you can send emails.
  3. Authenticate using the SMTP credentials generated in step 1 and then send a test email from an SMTP client.

Verify your email domain and bounce notifications with Amazon SES

In order to replace email attachments with a pre-signed URL and other application logic, you’ll need to set up SES to receive emails on a domain or subdomain you control.

  1. Verify the domain that you want to use for receiving emails.
  2. Publish a mail exchanger record (MX record) and include the Amazon SES inbound receiving endpoint for your AWS region ( e.g. inbound-smtp.us-east-1.amazonaws.com for US East Northern Virginia) in the domain DNS configuration.
  3. Amazon SES automatically manages the bounce notifications whenever recipient email is not deliverable. Follow the Set up notifications for bounces and complaints guide to setup bounce notifications.

Deploying the solution

The solution is implemented using AWS CDK with Python. First clone the solution repository to your local machine or Cloud9 development environment. Then deploy the solution by entering the following commands into your terminal:

python -m venv .venv
. ./venv/bin/activate
pip install -r requirements.txt

cdk deploy \
--context SenderEmail=<verified sender email> \
 --context RecipientEmail=<recipient email address> \
 --context ConfigurationSetName=<configuration set name>

Note:

The RecipientEmail CDK context parameter in the cdk deploy command above can be any email address in the domain you verified as part of the Verify the domain step. In other words, if the verified domain is acme-corp.com, then the emails can be [email protected], [email protected], etc.

The ConfigurationSetName CDK context can be obtained by navigating to Identities in Amazon SES console, selecting the verified domain (same as above), switching to “Configuration set” tab and selecting name of the “Default configuration set”

After deploying the solution, please, navigate to Amazon SES Email receiving in AWS console, edit the rule set and set it to Active.

Testing the solution end-to-end

Create a small file and generate a base64 encoding so that you can attach it to an SMTP message:

echo content >> demo.txt
cat demo.txt | base64 > demo64.txt
cat demo64.txt

Install openssl (which includes an SMTP client capability) using the following command:

sudo yum install openssl

Now run the SMTP client (openssl is used for the proof of concept, be sure to complete the steps in the workshop lab instructions first):

openssl s_client -crlf -quiet -starttls smtp -connect email-smtp.<aws-region>.amazonaws.com:587

and feed in the commands (replacing the brackets [] and everything between them) to send the SMTP message with the attachment you created.

EHLO amazonses.com
AUTH LOGIN
[base64 encoded SMTP user name]
[base64 encoded SMTP password]
MAIL FROM:[VERIFIED EMAIL IN SES]
RCPT TO:[VERIFIED EMAIL WITH SES RECEIPT RULE]
DATA
Subject: Demo from openssl
MIME-Version: 1.0
Content-Type: multipart/mixed;
 boundary="XXXXboundary text"

This is a multipart message in MIME format.

--XXXXboundary text
Content-Type: text/plain

Line1:This is a Test email sent to coded list of email addresses using the Amazon SES SMTP interface from openssl SMTP client.
Line2:Email_Rxers_Code:[ANYUSER1@DOMAIN_A,ANYUSER2@DOMAIN_B,ANYUSERX@DOMAIN_Y]:Email_Rxers_Code:
Line3:Last line.

--XXXXboundary text
Content-Type: text/plain;
Content-Transfer-Encoding: Base64
Content-Disposition: attachment; filename="demo64.txt"
Y29udGVudAo=
--XXXXboundary text
.
QUIT

Note: For base64 SMTP username and password above, use values obtained in Set up the SES SMTP interface, step 1. So for example, if the username is AKZB3LJAF5TQQRRPQZO1, then you can obtain base64 encoded value using following command:

echo -n AKZB3LJAF5TQQRRPQZO1 |base64
QUtaQjNMSkFGNVRRUVJSUFFaTzE=

This makes base64 encoded value QUtaQjNMSkFGNVRRUVJSUFFaTzE= Repeat same process for SMTP username and password values in the example above.

The openssl command should result in successful SMTP authentication and send. You should receive an email that looks like this:

Optimizing Security of the Solution

  1. Do not share DNS credentials. Unauthorized access can lead to domain control, potential denial of service, and AWS charges. Restrict access to authorized personnel only.
  2. Do not set the SENDER_EMAIL environment variable to the email address associated with the receipt rule. This address is a closely guarded secret, known only to administrators, and should be changed frequently.
  3. Review access to your code repository regularly to ensure there are no unauthorized changes to your code base.
  4. Utilize Permissions Boundaries to restrict the actions permitted by an IAM user or role.

Cleanup

To cleanup, start by navigating to Amazon SES Email receiving in AWS console, and setting the rule set to Inactive.

Once completed, delete the stack:

cdk destroy

Cleanup AWS SES Access Credentials

In Amazon SES Console, select Manage existing SMTP credentials, select the username for which credentials were created in Set up the SES SMTP interface above, navigate to the Security credentials tab and in the Access keys section, select Action -> Delete to delete AWS SES access credentials.

Troubleshooting

If you are not receiving the email or email is not being sent correctly there are a number of common causes of these errors:

  • HTTP Error 554 Message rejected email address is not verified. The following identities failed the check in region :
    • This means that you have attempted to send an email from address that has not been verified.
    • Please, ensure that the “MAIL FROM:[VERIFIED EMAIL IN SES]” email address sent via openssl matches the SenderEmail=<verified sender email> email address used in cdk deploy.
    • Also make sure this email address was used in Verify the sender email address step.
  • Email is not being delivered/forwarded
    • The incoming S3 bucket under the incoming prefix, contains file called AMAZON_SES_SETUP_NOTIFICATION. This means that MX record of the domain setup is missing. Please, validate that the MX record (step 2) of Verify your email domain with Amazon SES to receive emails section is fully configured.
    • Please ensure after deploying the Amazon SES solution, the created rule set was made active by navigating to Amazon SES Email receiving in AWS console, and set it to Active.
    • This may mean that the destination email address has bounced. Please, navigate to Amazon SES Suppression list in AWS console ensure that recipient’s email is not in the suppression list. If it is listed, you can see the reason in the “Suppression reason” column. There you may either manually remove from the suppression list or if the recipient email is not valid, consider using a different recipient email address.
AWS Legal Disclaimer: Sample code, software libraries, command line tools, proofs of concept, templates, or other related technology are provided as AWS Content or Third-Party Content under the AWS Customer Agreement, or the relevant written agreement between you and AWS (whichever applies). You should not use this AWS Content or Third-Party Content in your production accounts, or on production or other critical data. You are responsible for testing, securing, and optimizing the AWS Content or Third-Party Content, such as sample code, as appropriate for production grade use based on your specific quality control practices and standards. Deploying AWS Content or Third-Party Content may incur AWS charges for creating or using AWS chargeable resources, such as running Amazon EC2 instances or using Amazon S3 storage.

About the Authors

Tarek Soliman

Tarek Soliman

Tarek is a Senior Solutions Architect at AWS. His background is in Software Engineering with a focus on distributed systems. He is passionate about diving into customer problems and solving them. He also enjoys building things using software, woodworking, and hobby electronics.

Dave Spencer

Dave Spencer

Dave is a Senior Solutions Architect at AWS. His background is in cloud solutions architecture, Infrastructure as Code (Iac), systems engineering, and embedded systems programming. Dave’s passion is developing partnerships with Department of Defense customers to maximize technology investments and realize their strategic vision.

Ayman Ishimwe

Ayman Ishimwe

Ayman is a Solutions Architect at AWS based in Seattle, Washington. He holds a Master’s degree in Software Engineering and IT from Oakland University. With prior experience in software development, specifically in building microservices for distributed web applications, he is passionate about helping customers build robust and scalable solutions on AWS cloud services following best practices.

Dmytro Protsiv

Dmytro Protsiv

Dmytro is a Cloud Applications Architect for with Amazon Web Services. He is passionate about helping customers to solve their business challenges around application modernization.

Stacy Conant

Stacy Conant

Stacy is a Solutions Architect working with DoD and US Navy customers. She enjoys helping customers understand how to harness big data and working on data analytics solutions. On the weekends, you can find Stacy crocheting, reading Harry Potter (again), playing with her dogs and cooking with her husband.

Security Vulnerability in Saflok’s RFID-Based Keycard Locks

Post Syndicated from Bruce Schneier original https://www.schneier.com/blog/archives/2024/03/security-vulnerability-in-safloks-rfid-based-keycard-locks.html

It’s pretty devastating:

Today, Ian Carroll, Lennert Wouters, and a team of other security researchers are revealing a hotel keycard hacking technique they call Unsaflok. The technique is a collection of security vulnerabilities that would allow a hacker to almost instantly open several models of Saflok-brand RFID-based keycard locks sold by the Swiss lock maker Dormakaba. The Saflok systems are installed on 3 million doors worldwide, inside 13,000 properties in 131 countries. By exploiting weaknesses in both Dormakaba’s encryption and the underlying RFID system Dormakaba uses, known as MIFARE Classic, Carroll and Wouters have demonstrated just how easily they can open a Saflok keycard lock. Their technique starts with obtaining any keycard from a target hotel—say, by booking a room there or grabbing a keycard out of a box of used ones—then reading a certain code from that card with a $300 RFID read-write device, and finally writing two keycards of their own. When they merely tap those two cards on a lock, the first rewrites a certain piece of the lock’s data, and the second opens it.

Dormakaba says that it’s been working since early last year to make hotels that use Saflok aware of their security flaws and to help them fix or replace the vulnerable locks. For many of the Saflok systems sold in the last eight years, there’s no hardware replacement necessary for each individual lock. Instead, hotels will only need to update or replace the front desk management system and have a technician carry out a relatively quick reprogramming of each lock, door by door. Wouters and Carroll say they were nonetheless told by Dormakaba that, as of this month, only 36 percent of installed Safloks have been updated. Given that the locks aren’t connected to the internet and some older locks will still need a hardware upgrade, they say the full fix will still likely take months longer to roll out, at the very least. Some older installations may take years.

If ever. My guess is that for many locks, this is a permanent vulnerability.

The Insecurity of Video Doorbells

Post Syndicated from Bruce Schneier original https://www.schneier.com/blog/archives/2024/03/the-insecurity-of-video-doorbells.html

Consumer Reports has analyzed a bunch of popular Internet-connected video doorbells. Their security is terrible.

First, these doorbells expose your home IP address and WiFi network name to the internet without encryption, potentially opening your home network to online criminals.

[…]

Anyone who can physically access one of the doorbells can take over the device—no tools or fancy hacking skills needed.

No, Toothbrushes Were Not Used in a Massive DDoS Attack

Post Syndicated from Bruce Schneier original https://www.schneier.com/blog/archives/2024/02/no-toothbrushes-were-not-used-in-a-massive-ddos-attack.html

The widely reported story last week that 1.5 million smart toothbrushes were hacked and used in a DDoS attack is false.

Near as I can tell, a German reporter talking to someone at Fortinet got it wrong, and then everyone else ran with it without reading the German text. It was a hypothetical, which Fortinet eventually confirmed.

Or maybe it was a stock-price hack.

On IoT Devices and Software Liability

Post Syndicated from Bruce Schneier original https://www.schneier.com/blog/archives/2024/01/on-iot-devices-and-software-liability.html

New law journal article:

Smart Device Manufacturer Liability and Redress for Third-Party Cyberattack Victims

Abstract: Smart devices are used to facilitate cyberattacks against both their users and third parties. While users are generally able to seek redress following a cyberattack via data protection legislation, there is no equivalent pathway available to third-party victims who suffer harm at the hands of a cyberattacker. Given how these cyberattacks are usually conducted by exploiting a publicly known and yet un-remediated bug in the smart device’s code, this lacuna is unreasonable. This paper scrutinises recent judgments from both the Supreme Court of the United Kingdom and the Supreme Court of the Republic of Ireland to ascertain whether these rulings pave the way for third-party victims to pursue negligence claims against the manufacturers of smart devices. From this analysis, a narrow pathway, which outlines how given a limited set of circumstances, a duty of care can be established between the third-party victim and the manufacturer of the smart device is proposed.

A Robot the Size of the World

Post Syndicated from Bruce Schneier original https://www.schneier.com/blog/archives/2023/12/a-robot-the-size-of-the-world.html

In 2016, I wrote about an Internet that affected the world in a direct, physical manner. It was connected to your smartphone. It had sensors like cameras and thermostats. It had actuators: Drones, autonomous cars. And it had smarts in the middle, using sensor data to figure out what to do and then actually do it. This was the Internet of Things (IoT).

The classical definition of a robot is something that senses, thinks, and acts—that’s today’s Internet. We’ve been building a world-sized robot without even realizing it.

In 2023, we upgraded the “thinking” part with large-language models (LLMs) like GPT. ChatGPT both surprised and amazed the world with its ability to understand human language and generate credible, on-topic, humanlike responses. But what these are really good at is interacting with systems formerly designed for humans. Their accuracy will get better, and they will be used to replace actual humans.

In 2024, we’re going to start connecting those LLMs and other AI systems to both sensors and actuators. In other words, they will be connected to the larger world, through APIs. They will receive direct inputs from our environment, in all the forms I thought about in 2016. And they will increasingly control our environment, through IoT devices and beyond.

It will start small: Summarizing emails and writing limited responses. Arguing with customer service—on chat—for service changes and refunds. Making travel reservations.

But these AIs will interact with the physical world as well, first controlling robots and then having those robots as part of them. Your AI-driven thermostat will turn the heat and air conditioning on based also on who’s in what room, their preferences, and where they are likely to go next. It will negotiate with the power company for the cheapest rates by scheduling usage of high-energy appliances or car recharging.

This is the easy stuff. The real changes will happen when these AIs group together in a larger intelligence: A vast network of power generation and power consumption with each building just a node, like an ant colony or a human army.

Future industrial-control systems will include traditional factory robots, as well as AI systems to schedule their operation. It will automatically order supplies, as well as coordinate final product shipping. The AI will manage its own finances, interacting with other systems in the banking world. It will call on humans as needed: to repair individual subsystems or to do things too specialized for the robots.

Consider driverless cars. Individual vehicles have sensors, of course, but they also make use of sensors embedded in the roads and on poles. The real processing is done in the cloud, by a centralized system that is piloting all the vehicles. This allows individual cars to coordinate their movement for more efficiency: braking in synchronization, for example.

These are robots, but not the sort familiar from movies and television. We think of robots as discrete metal objects, with sensors and actuators on their surface, and processing logic inside. But our new robots are different. Their sensors and actuators are distributed in the environment. Their processing is somewhere else. They’re a network of individual units that become a robot only in aggregate.

This turns our notion of security on its head. If massive, decentralized AIs run everything, then who controls those AIs matters a lot. It’s as if all the executive assistants or lawyers in an industry worked for the same agency. An AI that is both trusted and trustworthy will become a critical requirement.

This future requires us to see ourselves less as individuals, and more as parts of larger systems. It’s AI as nature, as Gaia—everything as one system. It’s a future more aligned with the Buddhist philosophy of interconnectedness than Western ideas of individuality. (And also with science-fiction dystopias, like Skynet from the Terminator movies.) It will require a rethinking of much of our assumptions about governance and economy. That’s not going to happen soon, but in 2024 we will see the first steps along that path.

This essay previously appeared in Wired.

Join AWS Hybrid Cloud & Edge Day to Learn How to Deploy Your Applications in the Everywhere Cloud

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/join-aws-hybrid-cloud-edge-day-to-learn-how-to-deploy-your-applications-in-the-everywhere-cloud/

In his keynote of AWS re:Invent 2021, Dr. Werner Vogels shared the insight of how “the everywhere cloud” is bringing AWS to new locales through AWS hardware and services and spotlighted it as one of his tech predictions for 2022 and beyond in his blog post.

“What we will see in 2022, and even more so in the years to come, is the cloud accelerating beyond the traditional centralized infrastructure model and into unexpected environments where specialized technology is needed. The cloud will be in your car, your tea kettle, and your TV. The cloud will be in everything from trucks driving down the road, to the ships and planes that transport goods. The cloud will be globally distributed, and connected to almost any digital device or system on Earth, and even in space.”

AWS provides a truly consistent and secure experience to build and run applications across the continuum of environments where customers operate—from the cloud to large metro areas, 5G networks, on-premises locations, and to mobile and Internet of Things (IoT) devices.

To learn more, join us for AWS Hybrid Cloud & Edge Day, a free-to-attend one-day virtual event on August 30, 2023, starting at 10:00 AM PDT (1:00 PM ET). We will stream the event simultaneously across multiple platforms, including LinkedIn Live, Twitter, YouTube, and Twitch.

You can hear from AWS leaders and industry analysts on the latest hybrid cloud and edge computing trends and emerging technologies and learn best practices for using AWS hybrid cloud and edge services across the cloud continuum. Also, learn from our customers on data strategies and key use cases and gain a deeper understanding of AWS hybrid cloud and edge services and new features and benefits.

Here are some of the highlights you can expect from this event:

Leadership session – To kick off the day, we have a leadership session featuring Jan Hofmeyr, vice president of EC2 Edge, sharing insights into how customers are building high-performance, intelligent applications with recently announced AWS hybrid cloud, edge, and IoT capabilities. Elias Khnaser, chief of research at EK Media Group, will join Jan to discuss the global, business, and economic trends impacting hybrid cloud and edge computing and discuss the customer requirements and use cases.

Cloud-closer sessions – We’ll discuss how AWS is bringing the cloud closer to metro areas and telco networks. Services such as AWS Local Zones, AWS Outposts family, and AWS Wavelength bring the power of cloud compute and storage to the edge of 5G networks, unlocking more performant mobile experiences. We’ll highlight new and innovative use cases, including Norton LifeLock, Electronic Arts, and Epic Games, who have taken advantage of the operational consistency between AWS Regions and the edge. Also you can learn how to deploy in hybrid cloud scenarios in on-premises locations, such as examples from MindBody and ElToro through Onica, and more customer cases.

On-premises sessions – Learn about our options to bring AWS Cloud to your data centers and on-premises locations for a truly consistent experience across your environments. We will review real-world examples of how AWS hybrid and edge services enable local processing of data for faster response time and faster decision-making. Also, we will share how Toyota takes advantage of hybrid options from Amazon ECS and Amazon EKS to use familiar management tools across your environments to successfully modernize your applications. You can learn how to meet your on-premises regulatory requirements and real-world scenarios effectively in critical aspects of digital sovereignty and data residency.

Rugged edge sessions – You will learn about AWS services to support rugged, mobile, and disconnected edge, such as AWS Snow Family to enable organizations to deploy compute workloads in locations with denied, disrupted, intermittent, and limited (DDIL) connectivity. Learn how DDR.Live deployed their own 4G/LTE or 5G private network using AWS Private 5G for live events in the place with limited wireless connection. We will discuss the top use cases, such as deploying a pre-trained object detection model and architecting applications at the edge. Finally, we will discuss the benefits and requirements of operating at the edge with Holger Mueller, vice president and principal analyst, Constellation Research, Inc.

IoT panel discussion – We will discuss from panelist of AWS IoT customers and industry experts on their innovation journey. Join us to see how EuroTech brought to market a set of devices and services that improve operational efficiencies with connectivity at the edge. You’ll also hear how Wallbox, an Electric Vehicle charging company, reduced their operational costs and scaled efficiently with AWS IoT services.

Multicloud sessions – AWS has the tools to help you run and support your multicloud operations in the areas of governance, ops management, observability, and more. We will discuss common challenges in hybrid and multicloud environments and how AWS helps you manage, operate, and automate your processes. We’ll also talk about how Rackspace used AWS Systems Manager for instance patching across hybrid and multicloud environments, automating their infrastructure management across cloud providers.

This event is for any customer and builder who is eager to learn more about hybrid cloud, edge computing, IoT, networking, content delivery, and 5G. We’ll cover how you can support applications that need to remain on premises or at the edge due to low latency, local data processing, or data residency requirements.

To learn more details, see the event schedule, and register for AWS Hybrid Cloud & Edge Day, go to the event page.

Channy

How SOCAR handles large IoT data with Amazon MSK and Amazon ElastiCache for Redis

Post Syndicated from Younggu Yun original https://aws.amazon.com/blogs/big-data/how-socar-handles-large-iot-data-with-amazon-msk-and-amazon-elasticache-for-redis/

This is a guest blog post co-written with SangSu Park and JaeHong Ahn from SOCAR. 

As companies continue to expand their digital footprint, the importance of real-time data processing and analysis cannot be overstated. The ability to quickly measure and draw insights from data is critical in today’s business landscape, where rapid decision-making is key. With this capability, businesses can stay ahead of the curve and develop new initiatives that drive success.

This post is a continuation of How SOCAR built a streaming data pipeline to process IoT data for real-time analytics and control. In this post, we provide a detailed overview of streaming messages with Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon ElastiCache for Redis, covering technical aspects and design considerations that are essential for achieving optimal results.

SOCAR is the leading Korean mobility company with strong competitiveness in car-sharing. SOCAR wanted to design and build a solution for a new Fleet Management System (FMS). This system involves the collection, processing, storage, and analysis of Internet of Things (IoT) streaming data from various vehicle devices, as well as historical operational data such as location, speed, fuel level, and component status.

This post demonstrates a solution for SOCAR’s production application that allows them to load streaming data from Amazon MSK into ElastiCache for Redis, optimizing the speed and efficiency of their data processing pipeline. We also discuss the key features, considerations, and design of the solution.

Background

SOCAR operates about 20,000 cars and is planning to include other large vehicle types such as commercial vehicles and courier trucks. SOCAR has deployed in-car devices that capture data using AWS IoT Core. This data was then stored in Amazon Relational Database Service (Amazon RDS). The challenge with this approach included inefficient performance and high resource usage. Therefore, SOCAR looked for purpose-built databases tailored to the needs of their application and usage patterns while meeting the future requirements of SOCAR’s business and technical requirements. The key requirements for SOCAR included achieving maximum performance for real-time data analytics, which required storing data in an in-memory data store.

After careful consideration, ElastiCache for Redis was selected as the optimal solution due to its ability to handle complex data aggregation rules with ease. One of the challenges faced was loading data from Amazon MSK into the database, because there was no built-in Kafka connector and consumer available for this task. This post focuses on the development of a Kafka consumer application that was designed to tackle this challenge by enabling performant data loading from Amazon MSK to Redis.

Solution overview

Extracting valuable insights from streaming data can be a challenge for businesses with diverse use cases and workloads. That’s why SOCAR built a solution to seamlessly bring data from Amazon MSK into multiple purpose-built databases, while also empowering users to transform data as needed. With fully managed Apache Kafka, Amazon MSK provides a reliable and efficient platform for ingesting and processing real-time data.

The following figure shows an example of the data flow at SOCAR.

solution overview

This architecture consists of three components:

  • Streaming data – Amazon MSK serves as a scalable and reliable platform for streaming data, capable of receiving and storing messages from a variety of sources, including AWS IoT Core, with messages organized into multiple topics and partitions
  • Consumer application – With a consumer application, users can seamlessly bring data from Amazon MSK into a target database or data storage while also defining data transformation rules as needed
  • Target databases – With the consumer application, the SOCAR team was able to load data from Amazon MSK into two separate databases, each serving a specific workload

Although this post focuses on a specific use case with ElastiCache for Redis as the target database and a single topic called gps, the consumer application we describe can handle additional topics and messages, as well as different streaming sources and target databases such as Amazon DynamoDB. Our post covers the most important aspects of the consumer application, including its features and components, design considerations, and a detailed guide to the code implementation.

Components of the consumer application

The consumer application comprises three main parts that work together to consume, transform, and load messages from Amazon MSK into a target database. The following diagram shows an example of data transformations in the handler component.

consumer-application

The details of each component are as follows:

  • Consumer – This consumes messages from Amazon MSK and then forwards the messages to a downstream handler.
  • Loader – This is where users specify a target database. For example, SOCAR’s target databases include ElastiCache for Redis and DynamoDB.
  • Handler – This is where users can apply data transformation rules to the incoming messages before loading them into a target database.

Features of the consumer application

This connection has three features:

  • Scalability – This solution is designed to be scalable, ensuring that the consumer application can handle an increasing volume of data and accommodate additional applications in the future. For instance, SOCAR sought to develop a solution capable of handling not only the current data from approximately 20,000 vehicles but also a larger volume of messages as the business and data continue to grow rapidly.
  • Performance – With this consumer application, users can achieve consistent performance, even as the volume of source messages and target databases increases. The application supports multithreading, allowing for concurrent data processing, and can handle unexpected spikes in data volume by easily increasing compute resources.
  • Flexibility – This consumer application can be reused for any new topics without having to build the entire consumer application again. The consumer application can be used to ingest new messages with different configuration values in the handler. SOCAR deployed multiple handlers to ingest many different messages. Also, this consumer application allows users to add additional target locations. For example, SOCAR initially developed a solution for ElastiCache for Redis and then replicated the consumer application for DynamoDB.

Design considerations of the consumer application

Note the following design considerations for the consumer application:

  • Scale out – A key design principle of this solution is scalability. To achieve this, the consumer application runs with Amazon Elastic Kubernetes Service (Amazon EKS) because it can allow users to increase and replicate consumer applications easily.
  • Consumption patterns – To receive, store, and consume data efficiently, it’s important to design Kafka topics depending on messages and consumption patterns. Depending on messages consumed at the end, messages can be received into multiple topics of different schemas. For example, SOCAR has many different topics that are consumed by different workloads.
  • Purpose-built database – The consumer application supports loading data into multiple target options based on the specific use case. For example, SOCAR stored real-time IoT data in ElastiCache for Redis to power real-time dashboard and web applications, while storing recent trip information in DynamoDB that didn’t require real-time processing.

Walkthrough overview

The producer of this solution is AWS IoT Core, which sends out messages into a topic called gps. The target database of this solution is ElastiCache for Redis. ElastiCache for Redis a fast in-memory data store that provides sub-millisecond latency to power internet-scale, real-time applications. Built on open-source Redis and compatible with the Redis APIs, ElastiCache for Redis combines the speed, simplicity, and versatility of open-source Redis with the manageability, security, and scalability from Amazon to power the most demanding real-time applications.

The target location can be either another database or storage depending on the use case and workload. SOCAR uses Amazon EKS to operate the containerized solution to achieve scalability, performance, and flexibility. Amazon EKS is a managed Kubernetes service to run Kubernetes in the AWS Cloud. Amazon EKS automatically manages the availability and scalability of the Kubernetes control plane nodes responsible for scheduling containers, managing application availability, storing cluster data, and other key tasks.

For the programming language, the SOCAR team decided to use the Go Programming language, utilizing both the AWS SDK for Go and a Goroutine, a lightweight logical or virtual thread managed by the Go runtime, which makes it easy to manage multiple threads. The AWS SDK for Go simplifies the use of AWS services by providing a set of libraries that are consistent and familiar for Go developers.

In the following sections, we walk through the steps to implement the solution:

  1. Create a consumer.
  2. Create a loader.
  3. Create a handler.
  4. Build a consumer application with the consumer, loader, and handler.
  5. Deploy the consumer application.

Prerequisites

For this walkthrough, you should have the following:

Create a consumer

In this example, we use a topic called gps, and the consumer includes a Kafka client that receives messages from the topic. SOCAR created a struct and built a consumer (called NewConsumer in the code) to make it extendable. With this approach, any additional parameters and rules can be added easily.

To authenticate with Amazon MSK, SOCAR uses IAM. Because SOCAR already uses IAM to authenticate other resources, such as Amazon EKS, it uses the same IAM role (aws_msk_iam_v2) to authenticate clients for both Amazon MSK and Apache Kafka actions.

The following code creates the consumer:

type Consumer struct {
	logger      *zerolog.Logger
	kafkaReader *kafka.Reader
}

func NewConsumer(logger *zerolog.Logger, awsCfg aws.Config, brokers []string, consumerGroupID, topic string) *Consumer {
	return &Consumer{
		logger: logger,
		kafkaReader: kafka.NewReader(kafka.ReaderConfig{
			Dialer: &kafka.Dialer{
				TLS:           &tls.Config{MinVersion: tls.VersionTLS12},
				Timeout:       10 * time.Second,
				DualStack:     true,
				SASLMechanism: aws_msk_iam_v2.NewMechanism(awsCfg),
			},
			Brokers:     brokers, //
			GroupID:     consumerGroupID, //
			Topic:       topic, //
			StartOffset: kafka.LastOffset, //
		}),
	}
}

func (consumer *Consumer) Close() error {
	var err error = nil
	if consumer.kafkaReader != nil {
		err = consumer.kafkaReader.Close()
		consumer.logger.Info().Msg("closed kafka reader")
	}
	return err
}

func (consumer *Consumer) Consume(ctx context.Context) (kafka.message, error) {
	return consumer.kafkaReader.Readmessage(ctx)
}

Create a loader

The loader function, represented by the Loader struct, is responsible for loading messages to the target location, which in this case is ElastiCache for Redis. The NewLoader function initializes a new instance of the Loader struct with a logger and a Redis cluster client, which is used to communicate with the ElastiCache cluster. The redis.NewClusterClient object is initialized using the NewRedisClient function, which uses IAM to authenticate the client for Redis actions. This ensures secure and authorized access to the ElastiCache cluster. The Loader struct also contains the Close method to close the Kafka reader and free up resources.

The following code creates a loader:

type Loader struct {
	logger      *zerolog.Logger
	redisClient *redis.ClusterClient
}

func NewLoader(logger *zerolog.Logger, redisClient *redis.ClusterClient) *Loader {
	return &Loader{
		logger:      logger,
		redisClient: redisClient,
	}
}

func (consumer *Consumer) Close() error {
	var err error = nil
	if consumer.kafkaReader != nil {
		err = consumer.kafkaReader.Close()
		consumer.logger.Info().Msg("closed kafka reader")
	}
	return err
}

func (consumer *Consumer) Consume(ctx context.Context) (kafka.Message, error) {
	return consumer.kafkaReader.ReadMessage(ctx)
}

func NewRedisClient(ctx context.Context, awsCfg aws.Config, addrs []string, replicationGroupID, username string) (*redis.ClusterClient, error) {
	redisClient := redis.NewClusterClient(&redis.ClusterOptions{
		NewClient: func(opt *redis.Options) *redis.Client {
			return redis.NewClient(&redis.Options{
				Addr: opt.Addr,
				CredentialsProvider: func() (username string, password string) {
					token, err := BuildRedisIAMAuthToken(ctx, awsCfg, replicationGroupID, opt.Username)
					if err != nil {
						panic(err)
					}
					return opt.Username, token
				},
				PoolSize:    opt.PoolSize,
				PoolTimeout: opt.PoolTimeout,
				TLSConfig:   &tls.Config{InsecureSkipVerify: true},
			})
		},
		Addrs:       addrs,
		Username:    username,
		PoolSize:    100,
		PoolTimeout: 1 * time.Minute,
	})
	pong, err := redisClient.Ping(ctx).Result()
	if err != nil {
		return nil, err
	}
	if pong != "PONG" {
		return nil, fmt.Errorf("failed to verify connection to redis server")
	}
	return redisClient, nil
}

Create a handler

A handler is used to include business rules and data transformation logic that prepares data before loading it into the target location. It acts as a bridge between a consumer and a loader. In this example, the topic name is cars.gps.json, and the message includes two keys, lng and lat, with data type Float64. The business logic can be defined in a function like handlerFuncGpsToRedis and then applied as follows:

type (
	handlerFunc    func(ctx context.Context, loader *Loader, key, value []byte) error
	handlerFuncMap map[string]handlerFunc
)

var HandlerRedis = handlerFuncMap{
	"cars.gps.json":   handlerFuncGpsToRedis
}

func GetHandlerFunc(funcMap handlerFuncMap, topic string) (handlerFunc, error) {
	handlerFunc, exist := funcMap[topic]
	if !exist {
		return nil, fmt.Errorf("failed to find handler func for '%s'", topic)
	}
	return handlerFunc, nil
}

func handlerFuncGpsToRedis(ctx context.Context, loader *Loader, key, value []byte) error {
	// unmarshal raw data to map
	data := map[string]interface{}{}
	err := json.Unmarshal(value, &data)
	if err != nil {
		return err
	}

	// prepare things to load on redis as geolocation
	name := string(key)
	lng, err := getFloat64ValueFromMap(data, "lng")
	if err != nil {
		return err
	}
	lat, err := getFloat64ValueFromMap(data, "lat")
	if err != nil {
		return err
	}

	// add geolocation to redis
	return loader.RedisGeoAdd(ctx, "cars#gps", name, lng, lat)
}

Build a consumer application with the consumer, loader, and handler

Now you have created the consumer, loader, and handler. The next step is to build a consumer application using them. In a consumer application, you read messages from your stream with a consumer, transform them using a handler, and then load transformed messages into a target location with a loader. These three components are parameterized in a consumer application function such as the one shown in the following code:

type Connector struct {
	ctx    context.Context
	logger *zerolog.Logger

	consumer *Consumer
	handler  handlerFuncMap
	loader   *Loader
}

func NewConnector(ctx context.Context, logger *zerolog.Logger, consumer *Consumer, handler handlerFuncMap, loader *Loader) *Connector {
	return &Connector{
		ctx:    ctx,
		logger: logger,

		consumer: consumer,
		handler:  handler,
		loader:   loader,
	}
}

func (connector *Connector) Close() error {
	var err error = nil
	if connector.consumer != nil {
		err = connector.consumer.Close()
	}
	if connector.loader != nil {
		err = connector.loader.Close()
	}
	return err
}

func (connector *Connector) Run() error {
	wg := sync.WaitGroup{}
	defer wg.Wait()
	handlerFunc, err := GetHandlerFunc(connector.handler, connector.consumer.kafkaReader.Config().Topic)
	if err != nil {
		return err
	}
	for {
		msg, err := connector.consumer.Consume(connector.ctx)
		if err != nil {
			if errors.Is(context.Canceled, err) {
				break
			}
		}

		wg.Add(1)
		go func(key, value []byte) {
			defer wg.Done()
			err = handlerFunc(connector.ctx, connector.loader, key, value)
			if err != nil {
				connector.logger.Err(err).Msg("")
			}
		}(msg.Key, msg.Value)
	}
	return nil
}

Deploy the consumer application

To achieve maximum parallelism, SOCAR containerizes the consumer application and deploys it into multiple pods on Amazon EKS. Each consumer application contains a unique consumer, loader, and handler. For example, if you need to receive messages from a single topic with five partitions, you can deploy five identical consumer applications, each running in its own pod. Similarly, if you have two topics with three partitions each, you should deploy two consumer applications, resulting in a total of six pods. It’s a best practice to run one consumer application per topic, and the number of pods should match the number of partitions to enable concurrent message processing. The pod number can be specified in the Kubernetes deployment configuration

There are two stages in the Dockerfile. The first stage is the builder, which installs build tools and dependencies, and builds the application. The second stage is the runner, which uses a smaller base image (Alpine) and copies only the necessary files from the builder stage. It also sets the appropriate user permissions and runs the application. It’s also worth noting that the builder stage uses a specific version of the Golang image, while the runner stage uses a specific version of the Alpine image, both of which are considered to be lightweight and secure images.

The following code is an example of the Dockerfile:

# builder
FROM golang:1.18.2-alpine3.16 AS builder
RUN apk add build-base
WORKDIR /usr/src/app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o connector .

# runner
FROM alpine:3.16.0 AS runner
WORKDIR /usr/bin/app
RUN apk add --no-cache tzdata
RUN addgroup --system app && adduser --system --shell /bin/false --ingroup app app
COPY --from=builder /usr/src/app/connector .
RUN chown -R app:app /usr/bin/app
USER app
ENTRYPOINT ["/usr/bin/app/connector"]

Conclusion

In this post, we discussed SOCAR’s approach to building a consumer application that enables IoT real-time streaming from Amazon MSK to target locations such as ElastiCache for Redis. We hope you found this post informative and useful. Thank you for reading!


About the Authors

SangSu Park is the Head of Operation Group at SOCAR. His passion is to keep learning, embrace challenges, and strive for mutual growth through communication. He loves to travel in search of new cities and places.

jaehongJaeHong Ahn is a DevOps Engineer in SOCAR’s cloud infrastructure team. He is dedicated to promoting collaboration between developers and operators. He enjoys creating DevOps tools and is committed to using his coding abilities to help build a better world. He loves to cook delicious meals as a private chef for his wife.

bdb-2857-youngguYounggu Yun works at AWS Data Lab in Korea. His role involves helping customers across the APAC region meet their business objectives and overcome technical challenges by providing prescriptive architectural guidance, sharing best practices, and building innovative solutions together.

Processing geospatial IoT data with AWS IoT Core and the Amazon Location Service

Post Syndicated from Marcia Villalba original https://aws.amazon.com/blogs/compute/processing-geospatial-iot-data-with-aws-iot-core-and-the-amazon-location-service/

This post is written by Swarna Kunnath (Cloud Application Architect), and Anand Komandooru (Sr. Cloud Application Architect).

This blog post shows how to republish messages that arrive from Internet of Things (IoT) devices across AWS accounts using a replatforming approach. A replatforming approach minimizes changes to the core application architecture, allowing an organization to reduce risk and meet business needs more quickly. In this post, you also learn how to track an IoT device’s location using the Amazon Location Service.

The example used in this post relates to an aviation company that has airplanes with line replacement unit devices or transponders. Transponders are IoT devices that send airplane geospatial data (location and altitude) to the AWS IoT Core service. The company’s airplane transponders send location data to the AWS IoT Core service provisioned in an existing AWS account (source account). The solution required manual intervention to track airplane location sent by the transponders.

They must rearchitect their application due to an internal reorganization event. As part of the rearchitecture approach, the business decides to enhance the application to process the transponder messages in another AWS account (destination account). In addition, the business needs full automation of the airplane’s location tracking process, to minimize the risk of the application changes, and to deliver the changes quickly.

Solution overview

The high-level solution republishes the IoT messages from the source account to the destination account using AWS IoT Core, Amazon SQS, AWS Lambda, and integrates the application with Amazon Location Service. IoT messages are replicated to an IoT topic in the destination account for downstream processing, minimizing changes to the original application architecture. Integration with Amazon Location Service automates the process of device location tracking and alert generation.

The AWS IoT platform allows you to connect your internet-enabled devices to the AWS Cloud via MQTT, HTTP, or WebSocket protocol. Once connected, the devices send data to the MQTT topics. Data ingested on MQTT topics is routed into AWS services (Amazon S3, SQS, Amazon DynamoDB, and Lambda) by configuring rules in the AWS IoT Rules Engine. The AWS IoT Rules Engine offers ways to define queries to format and filter messages published by these devices, and supports integration with several other AWS services as targets.

The Amazon Location Service lets you add geospatial data including capabilities such as maps, points of interest, geocoding, routing, geofences, and tracking. The tracker with geofence tracks the location of the device based on the geospatial data in the published IoT messages. Amazon Location Service generates enter and exit events and integrates with Amazon EventBridge and Amazon Simple Notification Service (Amazon SNS) to generate alerts based on defined filters in EventBridge rules.

The solution in this post delivers high availability, scalability, and cost efficiency by using serverless and managed services. The serverless services used by this solution also provide automatic scaling and built-in high availability. Integrating Amazon Location Service with AWS IoT and EventBridge helps to automate the auditing and processing of geospatial messages.

Solution architecture

These steps describe an end-to-end sequence of events:

  1. An IoT device (a transponder in an airplane) publishes a message to the AWS IoT Core service in the source account.
  2. The message arrives at an AWS IoT Core topic in the source account.
  3. AWS IoT Rules Engine receives the message and processes it, using IoT rules attached to the corresponding topic in the source account.
  4. An AWS IoT rule replicates the message to an SQS queue in the destination account.
  5. A Lambda function in the destination account polls the SQS queue and publishes received messages in batches to the destination account IoT topic.
  6. The Location action configured to the IoT rule sends the messages to Amazon Location Service tracker from the IoT topic.
  7. An Amazon Location tracker sends events when an IoT device enters or exits a linked geofence.
  8. EventBridge receives these events and, via the configured event rule, sends out SNS notifications for the configured devices.

Pre-requisites

This example has the following prerequisites:

  1. Access to the AWS services mentioned in this blog post within two AWS Accounts.
  2. A local install of AWS SAM CLI to build and deploy the sample code.

Solution walkthrough

To deploy this solution, first deploy IoT components via the AWS Serverless Application Model (AWS SAM), in the source and destination accounts. After, configure Amazon Location Service resources in the destination account. To learn more, visit the AWS SAM deployment documentation.

Deploying the code

Deploy the following AWS SAM templates in order:

To build and deploy the code, run:

sam build --template <TemplateName>.yaml
sam deploy --guided

Configuring a tracker

Amazon Location Trackers send device location updates that provide data to retrieve current and historical locations for devices.

Using Amazon Location Trackers and Amazon Location Geofences together, you can automatically evaluate the location updates from your IoT devices against your geofences to generate the geofence events. Actions could be taken to generate the alerts based on the areas of interest.

  1. Follow the instructions in the documentation to create the tracker resource from the AWS Management Console. Use this information for the new tracker:
    • Name: Enter a unique name that has a maximum of 100 characters. For example, FlightTracker.
    • Description: Enter an optional description. For example, Tracker for storing device positions.
  2. Configure a Location action to the destination IoT rule that receives messages from the destination IoT topic and publishes them in batches to the configured Tracker device (for example, FlightTracker). The parameters in the JSON data that is returned to the Location action can also be configured via substitution templates.

Geofence collection

Geofences contain points and vertices that form a closed boundary, which defines an area of interest. For example, flight origin and destination details. You can use tools, such as GeoJSON.io, to draw geofences and save the output as a GeoJSON file. Follow the instructions in the documentation to create the GeoJSON file and link it to the geofence collection.

  1. Create the geofence collection with a GeoJSON file and link it to the tracker you just created.
  2. Link the tracker to the geofence by following these instructions and start tracking the device’s location updates. You can link them together so that you automatically evaluate location updates against all your geofences. You can evaluate device positions against geofences on demand as well.

When device positions are evaluated against geofences, they generate events. For example, when a plane enters or exits a location specified in the geofence.

You can configure EventBridge with rules to react to these events. You can set up SNS to notify your clients when a specific tracker device location changes. Follow the instructions in the documentation on how to set up EventBridge rules to integrate with Amazon Location Service events.

Testing the solution

You can test the first part of the solution by sending an IoT message with location details in the JSON format from the source account and verify that the message arrives at the destination account SQS queue. Detailed instructions to publish a test message from the source account that includes location information (latitude and longitude) can be found here.

Messages from the destination account SQS queue are published to the Amazon Location Service Tracker. When the location in the test message matches the criteria provided in the geofence, Amazon Location Service generates an event. EventBridge has a rule configured that gets matched when an Amazon Location tracker event arrives, and the rule target is an SNS topic that sends an email or text message to the client.

Cleaning up

To avoid incurring future charges, delete the CloudFormation stacks, location tracker, and geofence collection created as part of the solution walk-through. Replace the resource identifiers in the following commands with the ID/name of the resources.

  1. Delete the SAM application stack:
    aws cloudformation delete-stack --stack-name <StackName>
    

    Refer to this documentation for further information.

  2. Delete the location tracker:
    aws location delete-tracker --tracker-name <TrackerName>
  3. Delete the geofence collection:
    aws location delete-geofence-collection --collection-name <GeoCollectionName>

Conclusion

This blog post shows how to create a serverless solution for cross account IoT message publishing and tracking device location updates using Amazon Location Service.

It describes the process of how to publish AWS IoT messages across multiple accounts. Integration with the Amazon Location Service shows how to track IoT device location updates and generate alerts, alleviating the need for manual device location tracking.

For more serverless learning resources, visit Serverless Land.

AWS Week in Review – November 7, 2022

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/aws-week-in-review-november-7-2022/

With three weeks to go until AWS re:Invent opens in Las Vegas, the AWS News Blog Team is hard at work creating blog posts to share the latest launches and previews with you. As usual, we have a strong mix of new services, new features, and a surprise or two.

Last Week’s Launches
Here are some launches that caught my eye last week:

Amazon SNS Data Protection and Masking – After a quick public preview, this cool feature is now generally available. It uses pattern matching, machine learning models, and content policies to help protect data at scale. You can find many different kinds of personally identifiable information (PII) and protected health information (PHI) in message bodies and either block message delivery or mask (de-identify) the sensitive data, all in real-time and on a per-topic basis. To learn more, read the blog post or the message data protection documentation.

Amazon Textract Updates – This service extracts text, handwriting, and data from any document or image. This past week we updated the AnalyzeID function so that it can now extract the machine readable zone (MRZ) on passports issued by the United States, and we added the entire OCR output to the API response. We also updated the machine learning models that power the AnalyzeDocument function, with a focus on single-character boxed forms commonly found on tax and immigration documents. Finally, we updated the AnalyzeExpense function with support for new fields and higher accuracy for existing fields, bringing the total field count to more than 40.

Another Amazon Braket Processor – Our quantum computing service now supports Aquila, a new 256-qubit quantum computer from QuEra that is based on a programmable array of neutral Rubidium atoms. According to the What’s New, Aquila supports the Analog Hamiltonian Simulation (AHS) paradigm, allowing it to solve for the static and dynamic properties of quantum systems composed of many interacting particles.

Amazon S3 on Outposts – This service now lets you use additional S3 Lifecycle rules to optimize capacity management. You can expire objects as they age or are replaced with newer versions, with control at the bucket level, or for subsets defined by prefixes, object tags, or object sizes. There’s more info in the What’s New and in the S3 documentation.

AWS CloudFormation – There were two big updates last week: support for Amazon RDS Multi-AZ deployments with two readable standbys, and better access to detailed information on failed stack instances for operations on CloudFormation StackSets.

Amazon MemoryDB for Redis – You can now use data tiering as a lower cost way to to scale your clusters up to hundreds of terabytes of capacity. This new option uses a combination of instance memory and SSD storage in each cluster node, with all data stored durably in a multi-AZ transaction log. There’s more information in the What’s New and the blog post.

Amazon EC2 – You can now remove launch permissions for Amazon Machine Images (AMIs) that are directly shared with your AWS account.

X in Y – We launched existing AWS services and instance types in additional Regions:

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS News
Here are some additional news items that you may find interesting:

AWS Open Source News and Updates – My colleague Ricardo Sueiras highlights new open source projects, tools, and demos from the AWS Community. Read Installment 134 to see what’s going on!

New Case Study – A new AWS case study describes how Taggle (a company focused on smart water solutions in Australia) created an IoT platform that runs on AWS and uses Amazon Kinesis Data Streams to store & ingest data in real time. Using AWS allowed them to scale to accommodate 80,000 additional sensors that will roll out in 2022.

Upcoming AWS Events
re:Invent 2022AWS re:Invent is just three weeks away! Join us live from November 28th to December 2nd for keynotes, training and certification opportunities, and over 1,500 technical sessions. If you cannot make it to Las Vegas you can also join us online to watch the keynotes and leadership sessions live. Be sure to check out the re:Invent 2022 Attendee Guides, each curated by an AWS Hero, AWS industry team, or AWS partner.

PeerTalk – If you will be attending re:Invent in person and are interested in meeting with me or any of our featured experts, be sure to check out PeerTalk, our new onsite networking program.

That’s all for this week!

Jeff;

This post is part of our Week in Review series. Check back each week for a quick roundup of interesting news and announcements from AWS.