Simulated location data with Amazon Location Service

Post Syndicated from Aaron Sempf original https://aws.amazon.com/blogs/devops/simulated-location-data-with-amazon-location-service/

Modern location-based applications require the processing and storage of real-world assets in real-time. The recent release of Amazon Location Service and its Tracker feature makes it possible to quickly and easily build these applications on the AWS platform. Tracking real-world assets is important, but at some point when working with Location Services you will need to demo or test location-based applications without real-world assets.

Applications that track real-world assets are difficult to test and demo in a realistic setting, and it can be hard to get your hands on large amounts of real-world location data. Furthermore, not every company or individual in the early stages of developing a tracking application has access to a large fleet of test vehicles from which to derive this data.

Location data can also be considered highly sensitive, because it can be easily de-anonymized to identify individuals and movement patterns. Therefore, only a few openly accessible datasets exist and are unlikely to exhibit the characteristics required for your particular use-case.

To overcome this problem, the location-based services community has developed multiple openly available location data simulators. This blog will demonstrate how to connect one of those simulators to Amazon Location Service Tracker to test and demo your location-based services on AWS.

Walk-through

Part 1: Create a tracker in Amazon Location Service

This walkthrough will demonstrate how to get started setting up simulated data into your tracker.

Amazon location Service console
Step 1: Navigate to Amazon Location Service in the AWS Console and select “Trackers“.

Step 2: On the “Trackers” screen click the orange “Create tracker“ button.

Select Create Tracker

Create a Tracker form

Step 3: On the “Create tracker” screen, name your tracker and make sure to reply “Yes” to the question asking you if you will only use simulated or sample data. This allows you to use the free-tier of the service.

Next, click “Create tracker” to create you tracker.

Confirm create tracker

Done. You’ve created a tracker. Note the “Name” of your tracker.

Generate trips with the SharedStreets Trip-simulator

A common option for simulating trip data is the shared-steets/trip-simulator project.

SharedStreets maintains an open-source project on GitHub – it is a probabilistic, multi-agent GPS trajectory simulator. It even creates realistic noise, and thus can be used for testing algorithms that must work under real-world conditions. Of course, the generated data is fake, so privacy is not a concern.

The trip-simulator generates files with a single GPS measurement per line. To playback those files to the Amazon Location Service Tracker, you must use a tool to parse the file; extract the GPS measurements, time measurements, and device IDs of the simulated vehicles; and send them to the tracker at the right time.

Before you start working with the playback program, the trip-simulator requires a map to simulate realistic trips. Therefore, you must download a part of OpenStreetMap (OSM). Using GeoFabrik you can download extracts at the size of states or selected cities based on the area within which you want to simulate your data.

This blog will demonstrate how to simulate a small fleet of cars in the greater Munich area. The example will be written for OS-X, but it generalizes to Linux operating systems. If you have a Windows operating system, I recommend using Windows Subsystem for Linux (WSL). Alternatively, you can run this from a Cloud9 IDE in your AWS account.

Step 1: Download the Oberbayern region from download.geofabrik.de

Prerequisites:

curl https://download.geofabrik.de/europe/germany/bayern/oberbayern-latest.osm.pbf -o oberbayern-latest.osm.pbf

Step 2: Install osmium-tool

Prerequisites:

brew install osmium-tool

Step 3: Extract Munich from the Oberbayern map

osmium extract -b "11.5137,48.1830,11.6489,48.0891" oberbayern-latest.osm.pbf -o ./munich.osm.pbf -s "complete_ways" --overwrite

Step 4: Pre-process the OSM map for the vehicle routing

Prerequisites:

docker run -t -v $(pwd):/data osrm/osrm-backend:v5.25.0 osrm-extract -p /opt/car.lua /data/munich.osm.pbf
docker run -t -v $(pwd):/data osrm/osrm-backend:v5.25.0 osrm-contract /data/munich.osrm

Step 5: Install the trip-simulator

Prerequisites:

npm install -g trip-simulator

Step 6: Run a 10 car, 30 minute car simulation

trip-simulator \
  --config car \
  --pbf munich.osm.pbf \
  --graph munich.osrm \
  --agents 10 \
  --start 1563122921000 \
  --seconds 1800 \
  --traces ./traces.json \
  --probes ./probes.json \
  --changes ./changes.json \
  --trips ./trips.json

The probes.json file is the file containing the GPS probes we will playback to Amazon Location Service.

Part 2: Playback trips to Amazon Location Service

Now that you have simulated trips in the probes.json file, you can play them back in the tracker created earlier. For this, you must write only a few lines of Python code. The following steps have been neatly separated into a series of functions that yield an iterator.

Prerequisites:

Step 1: Load the probes.json file and yield each line

import json
import time
import datetime
import boto3

def iter_probes_file(probes_file_name="probes.json"):
    """Iterates a file line by line and yields each individual line."""
    with open(probes_file_name) as probes_file:
        while True:
            line = probes_file.readline()
            if not line:
                break
            yield line

Step 2: Parse the probe on each line
To process the probes, you parse the JSON on each line and extract the data relevant for the playback. Note that the coordinates order is longitude, latitude in the probes.json file. This is the same order that the Location Service expects.

def parse_probes_trip_simulator(probes_iter):
    """Parses a file witch contains JSON document, one per line.
    Each line contains exactly one GPS probe. Example:
    {"properties":{"id":"RQQ-7869","time":1563123002000,"status":"idling"},"geometry":{"type":"Point","coordinates":[-86.73903753135207,36.20418779626351]}}
    The function returns the tuple (id,time,status,coordinates=(lon,lat))
    """
    for line in probes_iter:
        probe = json.loads(line)
        props = probe["properties"]
        geometry = probe["geometry"]
        yield props["id"], props["time"], props["status"], geometry["coordinates"]

Step 3: Update probe record time

The probes represent historical data. Therefore, when you playback you will need to normalize the probes recorded time to sync with the time you send the request in order to achieve the effect of vehicles moving in real-time.

This example is a single threaded playback. If the simulated playback lags behind the probe data timing, then you will be provided a warning through the code detecting the lag and outputting a warning.

The SharedStreets trip-simulator generates one probe per second. This frequency is too high for most applications, and in real-world applications you will often see frequencies of 15 to 60 seconds or even less. You must decide if you want to add another iterator for sub-sampling the data.

def update_probe_record_time(probes_iter):
    """
    Modify all timestamps to be relative to the time this function was called.
    I.e. all timestamps will be equally spaced from each other but in the future.
    """
    new_simulation_start_time_utc_ms = datetime.datetime.now().timestamp() * 1000
    simulation_start_time_ms = None
    time_delta_recording_ms = None
    for i, (_id, time_ms, status, coordinates) in enumerate(probes_iter):
        if time_delta_recording_ms is None:
            time_delta_recording_ms = new_simulation_start_time_utc_ms - time_ms
            simulation_start_time_ms = time_ms
        simulation_lag_sec = (
            (
                datetime.datetime.now().timestamp() * 1000
                - new_simulation_start_time_utc_ms
            )
            - (simulation_start_time_ms - time_ms)
        ) / 1000
        if simulation_lag_sec > 2.0 and i % 10 == 0:
            print(f"Playback lags behind by {simulation_lag_sec} seconds.")
        time_ms += time_delta_recording_ms
        yield _id, time_ms, status, coordinates

Step 4: Playback probes
In this step, pack the probes into small batches and introduce the timing element into the simulation playback. The reason for placing them in batches is explained below in step 6.

def sleep(time_elapsed_in_batch_sec, last_sleep_time_sec):
    sleep_time = max(
        0.0,
        time_elapsed_in_batch_sec
        - (datetime.datetime.now().timestamp() - last_sleep_time_sec),
    )
    time.sleep(sleep_time)
    if sleep_time > 0.0:
        last_sleep_time_sec = datetime.datetime.now().timestamp()
    return last_sleep_time_sec


def playback_probes(
    probes_iter,
    batch_size=10,
    batch_window_size_sec=2.0,
):
    """
    Replays the probes in live mode.
    The function assumes, that the probes returned by probes_iter are sorted
    in ascending order with respect to the probe timestamp.
    It will either yield batches of size 10 or smaller batches if the timeout is reached.
    """
    last_probe_record_time_sec = None
    time_elapsed_in_batch_sec = 0
    last_sleep_time_sec = datetime.datetime.now().timestamp()
    batch = []
    # Creates two second windows and puts all the probes falling into
    # those windows into a batch. If the max. batch size is reached it will yield early.
    for _id, time_ms, status, coordinates in probes_iter:
        probe_record_time_sec = time_ms / 1000
        if last_probe_record_time_sec is None:
            last_probe_record_time_sec = probe_record_time_sec
        time_to_next_probe_sec = probe_record_time_sec - last_probe_record_time_sec
        if (time_elapsed_in_batch_sec + time_to_next_probe_sec) > batch_window_size_sec:
            last_sleep_time_sec = sleep(time_elapsed_in_batch_sec, last_sleep_time_sec)
            yield batch
            batch = []
            time_elapsed_in_batch_sec = 0
        time_elapsed_in_batch_sec += time_to_next_probe_sec
        batch.append((_id, time_ms, status, coordinates))
        if len(batch) == batch_size:
            last_sleep_time_sec = sleep(time_elapsed_in_batch_sec, last_sleep_time_sec)
            yield batch
            batch = []
            time_elapsed_in_batch_sec = 0
        last_probe_record_time_sec = probe_record_time_sec
    if len(batch) > 0:
        last_sleep_time_sec = sleep(time_elapsed_in_batch_sec, last_sleep_time_sec)
        yield batch

Step 5: Create the updates for the tracker

LOCAL_TIMEZONE = (
    datetime.datetime.now(datetime.timezone(datetime.timedelta(0))).astimezone().tzinfo
)

def convert_to_tracker_updates(probes_batch_iter):
    """
    Converts batches of probes in the format (id,time_ms,state,coordinates=(lon,lat))
    into batches ready for upload to the tracker.
    """
    for batch in probes_batch_iter:
        updates = []
        for _id, time_ms, _, coordinates in batch:
            # The boto3 location service client expects a datetime object for sample time
            dt = datetime.datetime.fromtimestamp(time_ms / 1000, LOCAL_TIMEZONE)
            updates.append({"DeviceId": _id, "Position": coordinates, "SampleTime": dt})
        yield updates

Step 6: Send the updates to the tracker
In the update_tracker function, you use the batch_update_device_position function of the Amazon Location Service Tracker API. This lets you send batches of up to 10 location updates to the tracker in one request. Batching updates is much more cost-effective than sending one-by-one. You pay for each call to batch_update_device_position. Therefore, batching can lead to a 10x cost reduction.

def update_tracker(batch_iter, location_client, tracker_name):
    """
    Reads tracker updates from an iterator and uploads them to the tracker.
    """
    for update in batch_iter:
        response = location_client.batch_update_device_position(
            TrackerName=tracker_name, Updates=update
        )
        if "Errors" in response and response["Errors"]:
            for error in response["Errors"]:
                print(error["Error"]["Message"])

Step 7: Putting it all together
The follow code is the main section that glues every part together. When using this, make sure to replace the variables probes_file_name and tracker_name with the actual probes file location and the name of the tracker created earlier.

if __name__ == "__main__":
    location_client = boto3.client("location")
    probes_file_name = "probes.json"
    tracker_name = "my-tracker"
    iterator = iter_probes_file(probes_file_name)
    iterator = parse_probes_trip_simulator(iterator)
    iterator = update_probe_record_time(iterator)
    iterator = playback_probes(iterator)
    iterator = convert_to_tracker_updates(iterator)
    update_tracker(
        iterator, location_client=location_client, tracker_name=tracker_name
    )

Paste all of the code listed in steps 1 to 7 into a file called trip_playback.py, then execute

python3 trip_playback.py

This will start the playback process.

Step 8: (Optional) Tracking a device’s position updates
Once the playback is running, verify that the updates are actually written to the tracker repeatedly querying the tracker for updates for a single device. Here, you will use the get_device_position function of the Amazon Location Service Tracker API to receive the last known device position.

import boto3
import time

def get_last_vehicle_position_from_tracker(
    device_id, tracker_name="your-tracker", client=boto3.client("location")
):
    response = client.get_device_position(DeviceId=device_id, TrackerName=tracker_name)
    if response["ResponseMetadata"]["HTTPStatusCode"] != 200:
        print(str(response))
    else:
        lon = response["Position"][0]
        lat = response["Position"][1]
        return lon, lat, response["SampleTime"]
        
if __name__ == "__main__":   
    device_id = "my-device"     
    tracker_name = "my-tracker"
    while True:
        lon, lat, sample_time = get_last_vehicle_position_from_tracker(
            device_id=device_id, tracker_name=tracker_name
        )
        print(f"{lon}, {lat}, {sample_time}")
        time.sleep(10)

In the example above, you must replace the tracker_name with the name of the tracker created earlier and the device_id with the ID of one of the simulation vehicles. You can find the vehicle IDs in the probes.json file created by the SharedStreets trip-simulator. If you run the above code, then you should see the following output.

location probes data

AWS IoT Device Simulator

As an alternative, if you are familiar with AWS IoT, AWS has its own vehicle simulator that is part of the IoT Device Simulator solution. It lets you simulate a vehicle fleet moving on a road network. This has been described here. The simulator sends the location data to an Amazon IoT endpoint. The Amazon Location Service Developer Guide shows how to write and set-up a Lambda function to connect the IoT topic to the tracker.

The AWS IoT Device Simulator has a GUI and is a good choice for simulating a small number of vehicles. The drawback is that only a few trips are pre-packaged with the simulator and changing them is somewhat complicated. The SharedStreets Trip-simulator has much more flexibility, allowing simulations of fleets made up of a larger number of vehicles, but it has no GUI for controlling the playback or simulation.

Cleanup

You’ve created a Location Service Tracker resource. It does not incur any charges if it isn’t used. If you want to delete it, you can do so on the Amazon Location Service Tracker console.

Conclusion

This blog showed you how to use an open-source project and open-source data to generate simulated trips, as well as how to play those trips back to the Amazon Location Service Tracker. Furthermore, you have access to the AWS IoT Device Simulator, which can also be used for simulating vehicles.

Give it a try and tell us how you test your location-based applications in the comments.

About the authors

Florian Seidel

Florian is a Solutions Architect in the EMEA Automotive Team at AWS. He has worked on location based services in the automotive industry for the last three years and has many years of experience in software engineering and software architecture.

Aaron Sempf

Aaron is a Senior Partner Solutions Architect, in the Global Systems Integrators team. When not working with AWS GSI partners, he can be found coding prototypes for autonomous robots, IoT devices, and distributed solutions.