Nova postagem

Pesquisar

Artigo
· Abr. 23 6min de leitura

OMOP Odyssey - AWS HealthLake ( Strait of Messina )

Nearline FHIR® Ingestion to InterSystems OMOP from AWS HealthLake

This part of the OMOP Journey we reflect before attempting to challenge Scylla on how fortunate we are that InterSystems OMOP transform is built on the Bulk FHIR Export as the source payload.  This opens up hands off interoperability with the InterSystems OMOP transform across several FHIR® vendors, including Amazon Web Services HealthLake.

HealthLake Bulk FHIR Export
 

Healthlake supports bulk fhir import/export from the cli or api, the premise is simple and the docs are over exhaustive, we'll save a model the trouble of training on it again and link it if interested.  The more valuable thing to understand of the heading of this paragraph is the implementation of the bulk fhir export standard itself.


Nearline?

Yeah, only "Nearline" ingestion, as the HealthLake export is the whole data store, and does not have a feature to be incremental. Additionally it does not support a resource based trigger, so it has to be invoked at an interval or via some other means yet to be apparent to me at the resource activity level.  Still a great number of ways to poke the export throughout AWS, and without incremental exports you only want it to be triggered inside a tolerable processing window anyway for the whole datastore.

The Whole Datastore?

Yes, the job exports all the resources into a flat structure.  Though it may not be the cleanest process to import the same data to catch the incremental data, the InterSystems OMOP transform should handle it.
 

Walkthrough

Trying to make this short and to the point, the illustration below really encapsulates what a that a scheduled lambda can glue these two solutions together and automate your OMOP ingestion.



Step One, AWS: Create Bucket

Create a bucket with a few of keys, one is shared with InterSystems OMOP for ingesting into the FHIR Transformation, the others will support the automated ingestion.


Explanations of the keys:

  • export - landing area for the raw resource ndjson from the job
  • from-healthlake-to-intersystems-omop - landing area for the create .zip and integtration point with InterSystems OMOP
  • output - job output

Step Two, InterSystems OMOP

Create the Deployment providing the arn of the bucket and the keys from above, ie: `from-healthlake-to-intersystems-omop` key.

Snag the example policy from the post configuration step as indicated and apply it to the bucket in AWS.  There are some exhaustive examples of this in a previous post OMOP Odyssey - InterSystems OMOP Cloud Service (Troy).

Step Three, Schedule a HealthLake Export to Expected InterSystems OMOP format 💫

The explanation of the flow of things is in the code itself as well, but I will also put it in the explanation in the form of a prompt so maybe you can land in the same spot with your own changes.

In python, show me how to start a HealthLake export job, export it to a target location, and poll the status of the job until it is complete, then read all of the ndjson files it creates and into a zip them without the relative path included in the zip and upload it to another location in the same bucket, once the upload is complete, remove the exported files from the export job.

The resulting function and code are the following:

import json
import boto3
import uuid
import boto3
import zipfile
import io
import os
import time


def lambda_handler(event, context):
    # Botos
    s3 = boto3.client('s3')
    client = boto3.client('healthlake')

    # Vars
    small_guid = uuid.uuid4().hex[:8]
    bucket_name = 'intersystems-omop-fhir-bucket'
    prefix = 'export/'  # Make sure it ends with '/'
    output_zip_key = 'from-healthlake-to-intersystems-omop/healthlake_ndjson_' + small_guid + '.zip'
    datastore_id = '9ee0e51d987e#ai#8ca487e8e95b1d'
    response = client.start_fhir_export_job(
        JobName='FHIR2OMOPJob',
        OutputDataConfig={
            'S3Configuration': {
                'S3Uri': 's3://intersystems-omop-fhir-bucket/export/',
                'KmsKeyId': 'arn:aws:kms:us-east-2:12345:key/54918bec-#ai#-4710-9c18-1a65d0d4590b'
            }
        },
        DatastoreId=datastore_id,
        DataAccessRoleArn='arn:aws:iam::12345:role/service-role/AWSHealthLake-Export-2-OMOP',
        ClientToken=small_guid
    )

    job_id = response['JobId']
    print(f"Export job started: {job_id}")

    # Step 2: Poll until the job completes
    while True:
        status_response = client.describe_fhir_export_job(
            DatastoreId=datastore_id,
            JobId=job_id
        )

        status = status_response['ExportJobProperties']['JobStatus']
        print(f"Job status: {status}")

        if status in ['COMPLETED', 'FAILED', 'CANCELLED']:
            break
        time.sleep(10)  # wait before polling again
    # Step 3: Final result
    if status == 'COMPLETED':
        output_uri = status_response['ExportJobProperties']['OutputDataConfig']['S3Configuration']['S3Uri']
        print(f"Export completed. Data available at: {output_uri}")

    # Get list of all objects with .ndjson extension under the prefix
    ndjson_keys = []
    paginator = s3.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
        for obj in page.get('Contents', []):
            key = obj['Key']
            if key.endswith('.ndjson'):
                ndjson_keys.append(key)

    # Create ZIP in memory
    zip_buffer = io.BytesIO()
    with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
        for key in ndjson_keys:
            obj = s3.get_object(Bucket=bucket_name, Key=key)
            file_data = obj['Body'].read()
            arcname = os.path.basename(key)
            zf.writestr(arcname, file_data)

    zip_buffer.seek(0)

    # Upload ZIP back to S3
    s3.put_object(
        Bucket=bucket_name,
        Key=output_zip_key,
        Body=zip_buffer.getvalue()
    )
    print(f"Created ZIP with {len(ndjson_keys)} files at s3://{bucket_name}/{output_zip_key}")
    # Clean up
    paginator = s3.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

    for page in pages:
        if 'Contents' in page:
            # Exclude the folder marker itself if it exists
            delete_keys = [
                {'Key': obj['Key']}
                for obj in page['Contents']
                if obj['Key'] != prefix  # protect the folder key (e.g., 'folder1/')
            ]

            if delete_keys:
                s3.delete_objects(Bucket=bucket_name, Delete={'Objects': delete_keys})
                print(f"Deleted {len(delete_keys)} objects under {prefix}")
        else:
            print(f"No objects found under {prefix}")
    else:
        print(f"Export job did not complete successfully. Status: {status}")
    
    return {
        'statusCode': 200,
        'body': json.dumps(response)
    }


This function fires at an interval of about every 10 minutes via an EventBridge schedule, this will have to be adjusted to meet your workload characteristics.
 

Step Four, Validate Ingestion ✔

LGTM! we can see the zips in the ingestion location are successfully getting picked up by the transform in InterSystems OMOP.

Step Five, Smoke Data ✔

LGTM! FHIR Organization Resource = OMOPCDM54 care_site.

Discussão (0)1
Entre ou crie uma conta para continuar
Pergunta
· Abr. 23

IntegratedML

Hi Guys,

I'm a newbie that doesn't know much about integratedML and looking for a first push into it, I've setup VSCode with my IRIS 2024.3 running in Linux and my understanding is that we can create models using SQL, so first, do I need to setup a specific environment where I can run my SQL commands to create & train Models or just using SMP, and do I need to install or enable Python ..etc things required to setup the environment?

Then if there are easy samples or training materials on how to create, train & deploy my model?  

 

Thanks

2 Comments
Discussão (2)3
Entre ou crie uma conta para continuar
InterSystems Oficial
· Abr. 23

Les versions de maintenance 2024.1.4 et 2023.1.6 d'InterSystems IRIS, IRIS for Health et HealthShare HealthConnect sont désormais disponibles

Les versions de maintenance 2024.1.4 et 2023.1.6 de la plateforme de données InterSystems IRIS®, d'InterSystems IRIS® for HealthTM et de HealthShare® Health Connect sont désormais disponibles en disponibilité générale (GA). Ces versions incluent les correctifs pour l'alerte suivante récemment émise : Alerte : Requêtes SQL renvoyant des résultats erronés | InterSystems. N'hésitez pas à partager vos commentaires via la Communauté des développeurs afin que nous puissions développer ensemble un meilleur produit.

Documentation

Vous trouverez les listes détaillées des modifications et des listes de contrôle des mises à niveau sur les pages suivantes :

Programmes d'accès anticipé (EAP)

De nombreux PAE sont actuellement disponibles. Consultez cette page et inscrivez-vous auprès des personnes intéressées.

Comment obtenir le logiciel ?

Les packages d'installation complets pour InterSystems IRIS et InterSystems IRIS for Health sont disponibles sur la page « Kits complets pour la plateforme de données InterSystems IRIS » du WRC. Les kits HealthShare Health Connect sont disponibles sur la page « Kits complets HealthShare » du WRC. Les images de conteneurs sont disponibles sur le registre de conteneurs InterSystems.

Disponibilité et informations sur les packages

Cette version est fournie avec des packages d'installation classiques pour toutes les plateformes prises en charge, ainsi que des images de conteneurs au format Docker. Pour obtenir la liste complète, consultez le document « Plateformes prises en charge ». Les numéros de build de ces versions de maintenance sont : 2024.1.4.512.0 et 2023.1.6.809.0.

Discussão (0)0
Entre ou crie uma conta para continuar
InterSystems Oficial
· Abr. 23

インターシステムズ製品 2024.1.4 と 2023.1.6 リリースのご案内

インターシステムズは、InterSystems IRIS®データプラットフォームInterSystems IRIS® for HealthTMHealthShare® Health Connect のメンテナンスバージョン 2024.1.4 2023.1.6 をリリースしました。このリリースには以前ご案内した 警告:SQLクエリが間違った結果を返す の修正を含みます。製品の品質改善のために、開発者コミュニティを通じてぜひご意見をお聞かせください。

ドキュメント

詳細な変更リストとアップグレードチェックリストはこちらのドキュメントをご参照ください(すべて英語です):
✅ 2024.1.4

✅ 2023.1.6

 

早期アクセスプログラム (Early Access Programs; EAPs)

多くの 早期アクセスプログラムをご用意しております。こちらの ページ からご興味のあるプログラムにお申込みいただけます。

キットの入手方法

InterSystems IRIS と InterSystems IRIS for Health の通常インストーラパッケージ形式のキットは WRC Direct の IRIS ダウンロードページ から、HealthShare Health Connect のキットは HealthShare ダウンロードページ からそれぞれ入手してください。
コンテナイメージは InterSystems Container Registry から入手できます。

利用可能なパッケージ情報

本リリースでは従来からのインストーラパッケージ形式とコンテナイメージ形式をご用意しています。その一覧は、 2024.1サポートプラットフォームページ(英語) と 2023.1サポートプラットフォームページ(英語) をご覧ください。

本メンテナンスリリースのバージョン番号は、2024.1.4.512.0  2023.1.6.809.0 です。

Discussão (0)1
Entre ou crie uma conta para continuar
Artigo
· Abr. 22 7min de leitura

Set, Get, and Don't Fret with JSON

The Good Old Days

The %Library.DynamicObject class has been in IRIS since before it became IRIS. If you have been using it since the Cache days, you may want to brush up on some of its changes.

In Cache 2018, the %Get method only had one argument. It was the key to retrieving from the JSON, meaning that if your JSON object called myObj, it would look like the following:

{
    “mybool”:true,
    “mynum”:1234,
    “mystring”:”Hello World!”
}

Exploiting myObj.%Get(“mybool”) would return a 1, myObj.%Get(“mynum”) would return 1234, and myObj.%Get(“mystring”) would return the string “Hello World!” 

Setting those parameters, on the other hand, required a bit more work. For instance, appointing a JSON property to a 0 could mean the number 0, a boolean value meaning false, or a literal string “0”. This is why the %Set method always had the third, optional argument. To create the above-mentioned JSON object, we could utilize the following code:

set myObj = ##class(%Library.DynamicObject).%New()
do myObj.%Set(“mybool”,1,”boolean”)
do myObj.%Set(“mynum”,1234,”number”)
do myObj.%Set(“mystring”,”Hello World!”,”string”)

Yet, in this case, we could also leave the third argument out of the last two. Then, 1234 would be recognized as numeric since it is not quoted, whereas “Hello World!” would be identified as a string because it is quoted. If we wanted to add the value 1234 to the object as a string, we could change the string to numeric. We could also specify the “null” type. In that case, the value must be “”. In practice, however, we often add those values from variables in our ObjectScript code. Due to that, it may be better to specify this argument, just in case the variable might be a string or numeric, to ensure that our JSON arrives at its destination encoded correctly.

How I Learned to Stop Worrying About <MAXSTRING> and Love JSON

As the wise Billy Joel once said, “The good old days weren’t always good, and tomorrow ain’t as bad as it seems.” The type list for %Set has grown, and the %Get method has acquired a couple of new arguments. What is crucial, they both support “stream” as a type. If you have ever handled receiving a lot of JSON data, you have probably seen a <MAXSTRING> error at some point, when the data contained in the JSON was longer than a typical string in IRIS is allowed to be. The new %Get lets you specify two more arguments, a default value, and a type. Your older code has not stopped working though because those two arguments are optional, and, if omitted, the methods will work precisely as they did in 2018. The default value is returned if nothing is found for the given key. The type functions similarly to the type argument in the %Set method. You can also specify what data type you are retrieving. Consider the following try/catch block:

try{
    Set mydata = myObj.%Get(“mydata”,”N/A”)
}
catch ex{
    if ex.Name = “<MAXSTRING>”{
        set mydata = myObj.%Get(“mydata”,,”stream”)
    }
}

It will attempt to set mydata to the value located inside “mydata” in the JSON object. If that item does not exist, it will return “N/A” instead. If that item is too long for a string, the system will throw an exception to the catch block. We should check the name of that exception since if a different exception occurred, it would not make sense to try to get the data as a stream either. You can read more on exception handling here. If it is <MAXSTRING>, we will specify that we want to retrieve mydata as a stream instead. Retrieving the data as a stream will return an object of the class %Stream.DynamicCharacter. It will never trigger a <MAXSTRING> exception, although it may throw a <STORE> exception if the process’s memory limit is exceeded.

If you take the approach described above, you will not know whether mydata in the code is a string or a stream. It means that you will have to follow up with the code similar to the one below:

if $ISOBJECT(mydata){
    //Place handling for streams here
}
else{
    //Place handling for strings here
}

You could also employ the stream option every single time guaranteeing that you always have a stream to work with. However, it would create unnecessary resource usage and overhead in your code.

Another option is to add a stream to a dynamic object using %Set. Check out the following example:

set mystream = ##class(%Stream.FileBinary).%New()
do mystream.LinkToFile(“/path/to/your/file”)
do myObj.%Set(“mydata”,mystream,”stream”)

The data in your file will now go into the mydata field of your dynamic object.

%Set and %Get also encode and decode strings with the help of the Base64 encoding. 

Always keep in mind that Base64 encoding is encoding not encryption! There are no secret keys or passphrases to decode your message, and it is easily reversible. Therefore, you should still use an encrypted protocol, e.g., TLS or HTTPS, for transmission! Base64 is operated to transmit non-ASCII characters in a way that allows ASCII-only systems to receive them and pass them along.

With that crucial side note out of the way, we can finally look at how this works. If we make one small change to the previous code sample, the contents of the file stream will become Base64 encoded.

set mystream = ##class(%Stream.FileBinary).%New()
do mystream.LinkToFile(“/path/to/your/file”)
do myObj.%Set(“mydata”,mystream,”stream>base64”)

On the other hand, if the data in the file was already Base64 encoded and we wanted to convert it to the decoded data, we only need to change one character.

set mystream = ##class(%Stream.FileBinary).%New()
do mystream.LinkToFile(“/path/to/your/file”)
do myObj.%Set(“mydata”,mystream,”stream<base64”)

The greater than or less than signs always point in the direction where conversion takes place. If we convert from an unencoded stream to a Base64 string, the sign will point to the Base64. If we convert a Base64 encoded stream to an unencoded stream, the sign will point from the Base64 to the stream. The same functionality exists for strings when using string>base64 and string<base64 as the type argument. We can utilize these type arguments for the %Get function as well. Note that when we do it, the default value we provided earlier will not be converted. It will be returned literally. Consider the following:

set something = myObj.%Get(“something”,”NA”,”string>base64”)

If the “something” item exists, it will be returned in its Base64 encoded form. However, if it does not exist, “NA” will be returned without being encoded.

There is one caveat to the Base64 encoding option. Only characters with a code between zero and 255 can be encoded in Base64. Character codes greater than 255 will result in a <WIDE CHAR> exception. For example, the following line will cause that exception:

set mychar = $C(256)
do myobj.%Set(“mychar”,mychar,”string>base64”)

So, I Heard You Like JSON . . .

Sometimes, there is JSON in your JSON. The default way of handling this is usually the one you would choose. Yet, another option has been added to the type argument to deal with a different use case. It is the “json” type. Look at the following JSON object:

{
    “mystring”:”Hello World!”,
    “mynumber”:1234,
    “myjson”:{
        “subitem1”:”Hello Mars!”,
        “subitem2”:”Hello Stars!”
    }
}

As a rule, when you run into this, you will operate the %Get method with the key “myjson” to get a dynamic object. Look at the example below:

set myjson = myobj.%Get(“myjson”)
write myjson.%Get(“subitem1”)

The line above would write “Hello Mars!” This is the most common use case in this situation. Yet, there may be situations when you would prefer to get the actual JSON contained within that item as a string. In that case, we can do the following:

set myjson = myobj.%Get(“myjson”,,”json”)
write myjson

It will write out the JSON string exactly the way it is:

{“subitem1”:”Hello Mars!”,”subitem2”:”Hello Stars!”}

It can come in handy in cases where we want to pass along the JSON as it is to another process. Note that, unlike all the other new types, this one is supported only for the %Get method, not the %Set method.

Hip, Hip, Array!

We have been discussing these new objects in the context of the %Library.DynamicObject class so far, but they are also supported for the %Library.DynamicArray class. In that class, %Set and %Get support the same type arguments as in the %Library.DynamicObject class. The dynamic array class has an additional %Push method though. It supports the same types as %Set, excluding the JSON type.

Without further ado, it is probably the right time to review your older codes and implement these changes to your benefit!

1 Comment
Discussão (1)1
Entre ou crie uma conta para continuar