The Azure Cosmos DB Blog

All about Azure Cosmos DB and Azure

Category: Articles (page 1 of 8)

Saving Medical Device to Cloud messages as HL7 FHIR Observation resources

In this tutorial we are going to learn how to get our Medical Device to Cloud data.

   Note:
This tutorial is from a chapter in the HL7 FHIR on Azure – Device Framework eBook.

The following are the Requirements.

Requirements

  • Ability to capture our Medical Device output data.
  • Ability to filter the data.
  • Ability to update the Observation Resource document with the output data.
  • Ability to modify the EndPoint Response after updating the Observation Resource document.
  • Ability to parse the Observation Resource document and post it to any EndPoint.

Solution

We are going to create a Function App, with a EventHubTrigger to do the following:

  1. Loop through the EventHub messages.
  2. Create a EventGrid message.
  3. Post the EventGrid message.

Next we will create a Function App, DeviceGrid that handles the updating of the Observation Resource document.

  1. Subscribes to our EventGrid messages
  2. Upserts the Observation Resource document with the data from the EventGrid messages

Leverage Azure API Management Policies.

  1. Publish DeviceGrid Function App to Azure API Management.
  2. Create a policy that subscribes to the EventGrid topic.
  3. Extract specific device data and post it to any EndPoint.

Function Apps

EventHubTrigger – Function App

We are using an EventHubTrigger to loop through the EventHub Messages.
Then we create a EventGrid message and publish it.

Source Code
#region

using System;
using System.Configuration;
using System.Globalization;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using System.Web;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.ServiceBus;
using Newtonsoft.Json;

#endregion

namespace DeviceMetrics
{
    public static class DeviceToCloud
    {
        private static readonly string TopicEndpoint = ConfigurationManager.AppSettings["topicEndpoint"];
        private static readonly string TopicKey = ConfigurationManager.AppSettings["topicKey"];

        [FunctionName("DeviceToCloud")]
        public static void Run([EventHubTrigger("DeviceToCloud", Connection = " EventHubConnectionString")]
            string[] myEventHubMessages, TraceWriter log)
        {
            log.Info($"C# Event Hub trigger function processed a message: {myEventHubMessages.Length}");

            foreach (var msg in myEventHubMessages)
            {
                dynamic data = JsonConvert.DeserializeObject(msg);

                var eventGridMessage = new GridEvent
                {
                    Id = Guid.NewGuid().ToString(),
                    EventTime = DateTime.UtcNow,
                    EventType = data.Properties.EventType, 
                    Data = data.Properties.Data,
                    Subject = data.Properties.Name,
                    Topic = data.Properties.Topic,
                    DeviceId = data.Id
                };


                try
                {
                    SendEvent(TopicEndpoint, TopicKey, eventGridMessage).ConfigureAwait(false);
                }
                catch (HttpRequestException e)
                {
                    log.Error(e.Message, e);
                }
            }
        }


        public static async Task SendEvent(string topicEndpoint, string topicKey, object data)
        {
            // Create a SAS token for the call to the event grid. We can do this with 
            // the SAS key as well but wanted to show an alternative.
            var sas = CreateEventGridSasToken(topicEndpoint, DateTime.Now.AddDays(1), topicKey);

            // Instantiate an instance of the HTTP client with the 
            // event grid topic endpoint.
            var client = new HttpClient {BaseAddress = new Uri(topicEndpoint)};

            // Configure the request headers with the content type
            // and SAS token needed to make the request.
            client.DefaultRequestHeaders.Accept.Clear();
            client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
            client.DefaultRequestHeaders.Add("aeg-sas-token", sas);

            // Serialize the data
            var json = JsonConvert.SerializeObject(data);
            var stringContent = new StringContent(json, Encoding.UTF8, "application/json");

            // Publish grid event
            await client.PostAsync(string.Empty, stringContent);
        }


        private static string CreateEventGridSasToken(string resourcePath, DateTime expirationUtc, string topicKey)
        {
            const char resource = 'r';
            const char expiration = 'e';
            const char signature = 's';

            // Encode the topic resource path and expiration parameters
            var encodedResource = HttpUtility.UrlEncode(resourcePath);
            var encodedExpirationUtc = HttpUtility.UrlEncode(expirationUtc.ToString(CultureInfo.InvariantCulture));

            // Format the unsigned SAS token
            var unsignedSas = $"{resource}={encodedResource}&{expiration}={encodedExpirationUtc}";

            // Create an HMCASHA256 policy with the topic key
            using (var hmac = new HMACSHA256(Convert.FromBase64String(topicKey)))
            {
                // Encode the signature and create the fully signed URL with the
                // appropriate parameters.
                var bytes = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(unsignedSas)));
                var encodedSignature = HttpUtility.UrlEncode(bytes);
                var signedSas = $"{unsignedSas}&{signature}={encodedSignature}";

                return signedSas;
            }
        }
    }
}

DeviceEventGrid – Function App

Source Code

#region

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Host;
using Newtonsoft.Json;

#endregion

namespace DeviceMetrics
{
    public static class DeviceEventGrid
    {
        private static readonly string Endpoint = ConfigurationManager.AppSettings["endpoint"];
        private static readonly string AuthKey = ConfigurationManager.AppSettings["authKey"];
        private static readonly string Database = ConfigurationManager.AppSettings["database"];
        private static readonly string Collection = ConfigurationManager.AppSettings["collection"];

        private static readonly DocumentClient Client = new DocumentClient(new Uri(Endpoint), AuthKey,
            new ConnectionPolicy
            {
                ConnectionMode = ConnectionMode.Direct,
                ConnectionProtocol = Protocol.Tcp
            }
        );


        [FunctionName("DeviceEventGrid")]
        public static async Task<HttpResponseMessage> Run(
            [HttpTrigger(AuthorizationLevel.Function, "post")]
            HttpRequestMessage req,
            TraceWriter log)
        {
            log.Info("C# HTTP trigger function processed a request.");


            var jsonContent = await req.Content.ReadAsStringAsync();
            var gridEvent = JsonConvert.DeserializeObject<List<GridEvent<Dictionary<string, string>>>>(jsonContent)
                ?.SingleOrDefault();

            if (gridEvent == null) return req.CreateErrorResponse(HttpStatusCode.BadRequest, $@"Missing event details");

            var gridEventType = req.Headers.GetValues("Aeg-Event-Type").FirstOrDefault();

            if (gridEventType == "SubscriptionValidation")
            {
                // Retrieve the validation code and echo back.
                var validationCode = gridEvent.Data["validationCode"];
                var validationResponse = JsonConvert.SerializeObject(new
                {
                    validationResponse = validationCode
                });

                return new HttpResponseMessage
                {
                    StatusCode = HttpStatusCode.OK,
                    Content = new StringContent(validationResponse)
                };
            }

            if (gridEventType == "Notification")
            {
                // Get the Observation Resource Document
                dynamic doc = await Client.ReadDocumentAsync(UriFactory.CreateDocumentUri(Database, Collection,
                    gridEvent.Data["deviceId"]));


                var valueQuanity = new ValueQuantity
                {
                    Code = doc.Component.ValueQuantity.Code,
                    System = doc.Component.ValueQuantity.System,
                    Unit = gridEvent.Data["unit"],
                    Value = gridEvent.Data["value"]
                };

                var component = new Component
                {
                    Code = doc.Component.Code,
                    ValueQuantity = valueQuanity
                };


                // partial values 
                var observation = new CompoundNumericObservation
                {
                    Id = doc.Id,
                    Component = {[0] = component},
                    EffectiveDateTime = new DateTimeOffset(DateTime.Parse(gridEvent.Data["timestamp"]))
                };


                try
                {
                    var result =
                        await Client.UpsertDocumentAsync(UriFactory.CreateDocumentUri(Database, Collection, gridEvent.Data["deviceId"]),
                            observation);

                    return req.CreateResponse(result.StatusCode);
                }
                catch (DocumentClientException e)
                {
                    var resp = new HttpResponseMessage
                    {
                        StatusCode = (HttpStatusCode) e.StatusCode,
                        Content = new StringContent(e.Message)
                    };
                    return resp;
                }
            }

            return req.CreateErrorResponse(HttpStatusCode.BadRequest,
                $@"Unknown request type");
        }

        public class GridEvent<T> where T : class
        {
            public string Id { get; set; }
            public string EventType { get; set; }
            public string Subject { get; set; }
            public DateTime EventTime { get; set; }
            public T Data { get; set; }
            public string Topic { get; set; }
        }
    }
}

Subscribing to EventGrid Events Using Azure API Management

After we pubish our Function App to Api Management, we will create a new Policy for our Function App. This allows us to extract specific device data post it to any EndPoint.

The following is an example policy.

<policies>
  <inbound>
    <base />
    <set-variable value="@(context.Request.Headers["Aeg-Event-Type"].Contains("SubscriptionValidation"))" name="isEventGridSubscriptionValidation" />
    <set-variable value="@(context.Request.Headers["Aeg-Event-Type"].Contains("Notification"))" name="isEventGridNotification" />
    <choose>
      <when condition="@(context.Variables.GetValueOrDefault<bool>("isEventGridSubscriptionValidation"))">
        <return-response>
          <set-status code="200" reason="OK" />
          <set-body>@{
            var events = context.Request.Body.As<string>();
            JArray a = JArray.Parse(events);
            var eventGridData = a.First["data"];
            var validationCode = eventGridData["validationCode"];
            var jOutput =
              new JObject(
                new JProperty("validationResponse", validationCode)
                );
            return jOutput.ToString();
          }</set-body>
        </return-response>
      </when>
      <when condition="@(context.Variables.GetValueOrDefault<bool>("isEventGridNotification"))">
        <send-one-way-request mode="new">
          <set-url>https://XXXXXXXXXXXXXXXXXXXXXX}</set-url>
          <set-method>POST</set-method>
          <set-body>@{
            var events = context.Request.Body.As<string>();
            JArray a = JArray.Parse(events);
            var eventGridData = a.First["data"];
            var deviceId = eventGridData["deviceId"];
            var deviceData = eventGridData["deviceData"];
            var patientd = eventGridData["patientd"];
            return new JObject(
                new JProperty("deviceId", deviceId),
                new JProperty("data", deviceData),
                new JProperty("patientd", patientd)                ;
          }</set-body>
        </send-one-way-request>
      </when>
    </choose>
  </inbound>
  <backend>
    <base />
  </backend>
  <outbound>
    <base />
  </outbound>
  <on-error>
    <base />
  </on-error>
</policies>

Summary

  • We are able to leverage the use an Azure EventGrid to create an EventGrid message.
  • We are using a Function App to loop through the EventHub messages.
  • We are able to create and publish EventGrid messages within the Function App.
  • We created another Function App that subscribes to our EventGrid Topic.
  • This Function App upserts the H7 FHIR Observation Resource with the ValueQuantity data.
  • We are using a Azure API management Policy to parse the ValueQuantity data and post it to any EndPoint.

Next Steps

  • We are going to create a Durable Function to orchestrate the message flow

Using Azure Cosmos DB and Function Apps for updating Medical Devices from HL7 FHIR

star This is a sample chapter from a free eBook I am co-authoring with Cory G. Stevenson, Data Solution Architect & Data Scientist with Microsoft.
The eBook is based upon the HL7 FHIR on Azure Device Framework from the HL7 FHIR on Azure eBook available here Downlaod


Recently we were brought in to design and develop an IoT solution for an Medical Device manufacturer. Their devices are used through out the world.

Business Requirements

The Software Update Service is to be one of several Microservices that will comprise the complete IoT System. These services include Provisioning, Management, Logging, Analysis, Monitoring and Security

INFO: The chapter before this one, is about Configuring our Azure Cosmos DB Data Repository for HL7 FHIR Resources.

Functional Requirements

  • The service would support an Multi-Tenancy Architecture.
  • Ability to have the software updates as close as possible to their customers Medical Device System (MDS), Virtual Medical Device (VMD) or a Channel.
  • A Form-Post from a web application will be used to send the software update file, along with metadata to the Software Update Service.
  • All Device Data was to be stored as HL7 FHIR Resources.
  • All data must be encrypted at Rest and Intransit.

NOTE: There are three HL7 FHIR Medical Device Resources: Device, DeviceMetric, and DeviceComponent.

In FHIR, the “Device” is the “administrative” resource for the device (it does not change much and has manufacturer information etc.), whereas the DeviceComponent and DeviceMetric (which is really a kind of DeviceComponent) model the physical part, including operation status and is much more volatile.

The physical composition of a Device is done by the DeviceComponents pointing to their “parent” component using DeviceComponent.parent. All components point to the “logical” Device they belong to, using DeviceComponent.source.

HL7 FHIR has a REST API. The API is hosted in an Azure Web Application. It can be used by Practitioners and Healthcare EHR systems to query the Device Resources.

Solution

INFO: Device, DeviceMetric, DeviceComponent documents have been created from the Device was Provisioned.

We decided to create an Azure Microservice for the complete Software Update Service. This would provide support for multi-tenancy.

  • An Azure Function App would receive the software update from a Form-Data POST.
  • An Azure Durable Function would send the software update to the Medical Device System (MDS), Virtual Medical Device (VMD) or a Channel.
  • Azure Cosmos DB SQL API would be used as an HL7 FHIR Server.
  • Azure Cosmos DB would be used to store the software update file as an Document Attachment to the FHIR DeviceComponent Resource.
  • The Software Update files would be stored in Azure Content Delivery Network (CDN) using Azure Blob Storage. The files will be cached at physical nodes in the United States, Europe, Asia, Australia, and South America.
  • Our Cosmos DB Database Account will use Global Distribution to the regions in the United States, Europe, Asia, Australia, and South America.
  • All the Azure Services support data encryption both at Rest and Intransit.

INFO: The setup and configuration of the HL7 FHIR on Azure Cosmos DB, along with the HL7 FHIR Rest Api is outside the scope of the solution.

Architecture

Posting Device Software Update to HL7 FHIR Server (Azure Cosmos DB)

Form-Data


Steps

  1. Web App does a Form-Data POST to Function App.
  2. Function App Inserts Software Update file in Azure Blob Storage.
  3. Function App updates HL7 FHIR DeviceComponent Resource (document).
  4. Function App returns Response to Web App.

FormData Function App

Source Code


#region

using System;
using System.Configuration;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using HttpMultipartParser;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.WindowsAzure.Storage;
using Attachment = Microsoft.Azure.Documents.Attachment;

#endregion

namespace CosmosDBFormData
{
    public static class FormData
    {
        private static readonly string Endpoint = ConfigurationManager.AppSettings["endpoint"];
        private static readonly string AuthKey = ConfigurationManager.AppSettings["authKey"];
        private static readonly string Database = ConfigurationManager.AppSettings["database"];
        private static readonly string Collection = ConfigurationManager.AppSettings["collection"];
    
        private static readonly string ConnectionString = ConfigurationManager.AppSettings["storageConnectionString"];
        private static readonly string BlobContainer = ConfigurationManager.AppSettings["blobContainer"];
        private static readonly string CdnUrl = ConfigurationManager.AppSettings["cdnUrl"];

        private static readonly DocumentClient Client = new DocumentClient(new Uri(Endpoint), AuthKey,
            new ConnectionPolicy
            {
                ConnectionMode = ConnectionMode.Direct,
                ConnectionProtocol = Protocol.Tcp
            }
        );
        

        [FunctionName("FormData")]
        public static async Task<HttpResponseMessage> Run(
            [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]
            HttpRequestMessage req, TraceWriter log)
        {
        
            var storageAccount = CloudStorageAccount.Parse(ConnectionString);
            var cloudBlobClient = storageAccount.CreateCloudBlobClient();
            var cloudBlobContainer = cloudBlobClient.GetContainerReference(BlobContainer);
            var data = req.Content.ReadAsStreamAsync();
            var parser = new MultipartFormDataParser(data.Result);
            var deviceId = parser.GetParameterValue("DeviceId");
            var file = parser.Files.FirstOrDefault();
            
            if (file == null)
            {
            
                return req.CreateErrorResponse(HttpStatusCode.BadRequest, "The Attachment File is missing");
            }

            var fileName = file.FileName;
            var attachment = file.Data;
            var feedOptions = new FeedOptions {MaxItemCount = 1};

            try
            {
                // Get the DeviceComponent document
                var doc = Client.CreateDocumentQuery(UriFactory.CreateDocumentCollectionUri(Database, Collection),
                    $"SELECT * FROM d WHERE d.resourceType = \'DeviceComponent\' AND d.source.reference = \' Device/\'{deviceId}", feedOptions);

                Document d = doc.FirstOrDefault();

                try
                {
                    var cloudBlockBlob = cloudBlobContainer.GetBlockBlobReference(fileName);

                    try
                    {
                        await cloudBlockBlob.UploadFromStreamAsync(attachment);
                    }
                    catch (StorageException ex)
                    {
                        return req.CreateErrorResponse(HttpStatusCode.NotFound, ex.Message);
                    }
                    try
                    {
                        await Client.CreateAttachmentAsync(d.Id, new Attachment
                        {
                            Id = fileName,
                            ContentType = file.ContentType,
                            MediaLink = $"{CdnUrl}/{fileName}"
                        });
                        try
                        {
                            var update = await Client.UpsertDocumentAsync(UriFactory.CreateDocumentCollectionUri(Database, Collection), d);

                            return req.CreateResponse(HttpStatusCode.OK,update.Resource.Id);
                        }
                        catch (DocumentClientException ex)
                        {
                            var statusCode = ex.StatusCode;
                            return req.CreateErrorResponse((HttpStatusCode) statusCode, ex.Message);
                        }
                    }
                    catch (DocumentClientException ex)
                    {
                        var statusCode = ex.StatusCode;
                        return req.CreateErrorResponse((HttpStatusCode) statusCode, ex.Message);
                    }
                }
                catch (DocumentClientException ex)
                {
                    var statusCode = ex.StatusCode;
                    return req.CreateErrorResponse((HttpStatusCode) statusCode, ex.Message);
                }
            }
            catch (DocumentClientException ex)
            {
                var statusCode = ex.StatusCode;
                return req.CreateErrorResponse((HttpStatusCode) statusCode, ex.Message);
            }
        }
    }
}

HL7 FHIR Resources for Medical Devices

Device Resource Document Template

{
  "resourceType" : "Device",
  // from Resource: id, meta, implicitRules, and language
  // from DomainResource: text, contained, extension, and modifierExtension
  "identifier" : [{ Identifier }], // Instance identifier
  "udi" : { // Unique Device Identifier (UDI) Barcode string
    "deviceIdentifier" : "<string>", // Mandatory fixed portion of UDI
    "name" : "<string>", // Device Name as appears on UDI label
    "jurisdiction" : "<uri>", // Regional UDI authority
    "carrierHRF" : "<string>", // UDI Human Readable Barcode String
    "carrierAIDC" : "<base64Binary>", // UDI Machine Readable Barcode String
    "issuer" : "<uri>", // UDI Issuing Organization
    "entryType" : "<code>" // barcode | rfid | manual +
  },
  "status" : "<code>", // active | inactive | entered-in-error | unknown
  "type" : { CodeableConcept }, // What kind of device this is
  "lotNumber" : "<string>", // Lot number of manufacture
  "manufacturer" : "<string>", // Name of device manufacturer
  "manufactureDate" : "<dateTime>", // Date when the device was made
  "expirationDate" : "<dateTime>", // Date and time of expiry of this device (if applicable)
  "model" : "<string>", // Model id assigned by the manufacturer
  "version" : "<string>", // Version number (i.e. software)
  "patient" : { Reference(Patient) }, // Patient to whom Device is affixed
  "owner" : { Reference(Organization) }, // Organization responsible for device
  "contact" : [{ ContactPoint }], // Details for human/organization for support
  "location" : { Reference(Location) }, // Where the resource is found
  "url" : "<uri>", // Network address to contact device
  "note" : [{ Annotation }], // Device notes and comments
  "safety" : [{ CodeableConcept }] // Safety Characteristics of Device
}

Device Component Document Template

{
  "resourceType" : "DeviceComponent",
  // from Resource: id, meta, implicitRules, and language
  // from DomainResource: text, contained, extension, and modifierExtension
  "identifier" : { Identifier }, // R!  Instance id assigned by the software stack
  "type" : { CodeableConcept }, // R!  What kind of component it is
  "lastSystemChange" : "<instant>", // Recent system change timestamp
  "source" : { Reference(Device) }, // Top-level device resource link
  "parent" : { Reference(DeviceComponent) }, // Parent resource link
  "operationalStatus" : [{ CodeableConcept }], // Current operational status of the component, for example On, Off or Standby
  "parameterGroup" : { CodeableConcept }, // Current supported parameter group
  "measurementPrinciple" : "<code>", // other | chemical | electrical | impedance | nuclear | optical | thermal | biological | mechanical | acoustical | manual+
  "productionSpecification" : [{ // Specification details such as Component Revisions, or Serial Numbers
    "specType" : { CodeableConcept }, // Type or kind of production specification, for example serial number or software revision
    "componentId" : { Identifier }, // Internal component unique identification
    "productionSpec" : "<string>" // A printable string defining the component
  }],
  "languageCode" : { CodeableConcept } // Language code for the human-readable text strings produced by the device
}

Device Metric Document Template

{
  "resourceType" : "DeviceMetric",
  // from Resource: id, meta, implicitRules, and language
  // from DomainResource: text, contained, extension, and modifierExtension
  "identifier" : { Identifier }, // R!  Unique identifier of this DeviceMetric
  "type" : { CodeableConcept }, // R!  Identity of metric, for example Heart Rate or PEEP Setting
  "unit" : { CodeableConcept }, // Unit of Measure for the Metric
  "source" : { Reference(Device) }, // Describes the link to the source Device
  "parent" : { Reference(DeviceComponent) }, // Describes the link to the parent DeviceComponent
  "operationalStatus" : "<code>", // on | off | standby | entered-in-error
  "color" : "<code>", // black | red | green | yellow | blue | magenta | cyan | white
  "category" : "<code>", // R!  measurement | setting | calculation | unspecified
  "measurementPeriod" : { Timing }, // Describes the measurement repetition time
  "calibration" : [{ // Describes the calibrations that have been performed or that are required to be performed
    "type" : "<code>", // unspecified | offset | gain | two-point
    "state" : "<code>", // not-calibrated | calibration-required | calibrated | unspecified
    "time" : "<instant>" // Describes the time last calibration has been performed
  }]
}

Summary

  • You have seen how easy it is to use an Azure Function App to receive a Form-Post.
  • You have learned how to save the Software Update file to CDN as an Cosmos DB Document Attachment.

Next Step

  • You will see how we used Azure Durable Functions to publish the Software Updates to the Azure IoT Hubs.
Older posts
%d bloggers like this:
Skip to toolbar