Artigo
· Nov. 26, 2023 9min de leitura

Enviando de mensagens Kafka via JAVA PEX para processamento de prognósticos de exames de quarentena.

Introdução

Este artigo tem como objetivo explorar como o sistema FHIR-PEX funciona e foi desenvolvido, aproveitando os recursos do InterSystems IRIS.

Agilizando a identificação e o processamento de exames médicos em centros de diagnóstico clínico, nosso sistema visa aumentar a eficiência e a precisão dos fluxos de trabalho de saúde. Ao integrar os padrões FHIR com o banco de dados Java-PEX da InterSystems IRIS, o sistema ajuda os profissionais de saúde com recursos de validação e roteamento, contribuindo, em última análise, para melhorar a tomada de decisões e o atendimento ao paciente.

como funciona

  • Interoperabilidade IRIS:
    Recebe mensagens no padrão FHIR, garantindo integração e compatibilidade com dados de saúde.

  • Processamento de informações com 'PEX Java':
    Processa mensagens no formato FHIR e as direciona para tópicos Kafka com base em regras configuradas globalmente no banco de dados, facilitando o processamento e roteamento eficiente de dados, principalmente para exames direcionados à quarentena.

  • Tratamento de devoluções Kafka via back-end Java externo:
    Interpreta apenas os exames direcionados à quarentena, permitindo que o sistema trate os retornos do Kafka por meio de um backend Java externo. Facilita a geração de insights prognósticos para profissionais de saúde através de IA Generativa, contando com consultas de resultados de exames anteriores dos respectivos pacientes.

 

Desenvolvimento

Através do PEX (Production EXtension) da InterSystems, ferramenta de extensibilidade que permite aprimoramento e customização do comportamento do sistema, elaboramos uma Operação de Negócio. Este componente tem a tarefa de processar mensagens recebidas no formato FHIR dentro do sistema.

Segue exemplo:

    import com.intersystems.enslib.pex.*;
    import com.intersystems.jdbc.IRISObject;
    import com.intersystems.jdbc.IRIS;
    import com.intersystems.jdbc.IRISList;
    import com.intersystems.gateway.GatewayContext;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.*;
    
    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;
    
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.util.Properties;

    public class KafkaOperation extends BusinessOperation {
    // Connection to InterSystems IRIS
    private IRIS iris;
    
    // Connection to Kafka
    private Producer<Long, String> producer;
    
    // Kafka server address (comma separated if several)
    public String SERVERS;
    
    // Name of our Producer
    public String CLIENTID;
    
    /// Path to Config File
    public String CONFIG;
    
    public void OnInit() throws Exception {
    [...]
    }
    
    public void OnTearDown() throws Exception {
    [...]
    }
    
    public Object OnMessage(Object request) throws Exception {
        IRISObject req = (IRISObject) request;
        LOGINFO("Received object: " + req.invokeString("%ClassName", 1));
    
        // Create record
        String value = req.getString("Text");
        String topic = getTopicPush(req);
        final ProducerRecord<Long, String> record = new ProducerRecord<>(topic, value);
    
        // Send new record
        RecordMetadata metadata = producer.send(record).get();
    
        // Return record info
        IRISObject response = (IRISObject)(iris.classMethodObject("Ens.StringContainer","%New",topic+"|"+metadata.offset()));
        return response;
    }
    
    private Producer<Long, String> createProducer() throws IOException {
    [...]
    }
    
    private String getTopicPush(IRISObject req) {
    [...]
    }
    
    [...]
    }
`

Dentro da aplicação, o método getTopicPush assume a responsabilidade de identificar o tópico para o qual a mensagem será enviada.

A determinação para qual tópico a mensagem será enviada depende da existência de uma regra na “quarantineRule” global, conforme lida dentro do IRIS.

    String code = FHIRcoding.path("code").asText();
    String system = FHIRcoding.path("system").asText();

    IRISList quarantineRule = iris.getIRISList("quarantineRule",code,system);

     String reference = quarantineRule.getString(1);
     String value = quarantineRule.getString(2);

     String observationValue = fhir.path("valueQuantity").path("value").asText()

Quando o ^quarantineRule global existe, a validação do objeto FHIR pode ser validada.

    private boolean quarantineValueQuantity(String reference, String value, String observationValue) {
        LOGINFO("quarantine rule reference/value: " + reference + "/" + value);
        double numericValue = Double.parseDouble(value);
        double numericObservationValue = Double.parseDouble(observationValue);

        if ("<".equals(reference)) {
            return numericObservationValue < numericValue;
        }
        else if (">".equals(reference)) {
            return numericObservationValue > numericValue;
        }
        else if ("<=".equals(reference)) {
            return numericObservationValue <= numericValue;
        }
        else if (">=".equals(reference)) {
            return numericObservationValue >= numericValue;
        }
        
        return false;
    }

Exemplo prático:

Ao definir um global, como:

    Set ^quarantineRule("59462-2","http://loinc.org") = $LB(">","500") 

Isso estabelece uma regra para o código "59462-2" e o sistema ""http://loinc.org"" na ^quarantineRule global, especificando uma condição onde o valor quando maior que 500 é definido como quarentena. No aplicativo, o método getTopicPush pode então usar essa regra para determinar o tópico apropriado para enviar a mensagem com base no resultado da validação.

Dada a atribuição, o JSON abaixo seria enviado para quarentena, pois corresponde à condição especificada por ter:

 {
          "system": "http://loinc.org",
          "code": "59462-2",
          "display": "Testosterone"
}

"valueQuantity": { "value": 550, "unit": "ng/dL", "system": "http://unitsofmeasure.org", "code": "ng/dL" }

FHIR Observation:

{
    "resourceType": "Observation",
    "id": "3a8c7d54-1a2b-4c8f-b54a-3d2a7efc98c9",
    "status": "final",
    "category": [
      {
        "coding": [
          {
            "system": "http://terminology.hl7.org/CodeSystem/observation-category",
            "code": "laboratory",
            "display": "laboratory"
          }
        ]
      }
    ],
    "code": {
      "coding": [
        {
          "system": "http://loinc.org",
          "code": "59462-2",
          "display": "Testosterone"
        }
      ],
      "text": "Testosterone"
    },
    "subject": {
      "reference": "urn:uuid:274f5452-2a39-44c4-a7cb-f36de467762e"
    },
    "encounter": {
      "reference": "urn:uuid:100b4a8f-5c14-4192-a78f-7276abdc4bc3"
    },
    "effectiveDateTime": "2022-05-15T08:45:00+00:00",
    "issued": "2022-05-15T08:45:00.123+00:00",
    "valueQuantity": {
      "value": 550,
      "unit": "ng/dL",
      "system": "http://unitsofmeasure.org",
      "code": "ng/dL"
    }
}

 Quarkus Java application

Após o envio para o tópico desejado, foi construída uma aplicação Quarkus Java para recebimento de exames em quarentena.

    @ApplicationScoped
     public class QuarentineObservationEventListener {

    @Inject
    PatientService patientService;

    @Inject
    EventBus eventBus;

    @Transactional
    @Incoming("observation_quarantine")
    public CompletionStage onIncomingMessage(Message quarentineObservationMessage) {
        var quarentineObservation = quarentineObservationMessage.getPayload();
        var patientId = quarentineObservation.getSubject()
                .getReference();
        var patient = patientService.addObservation(patientId, quarentineObservation);
        publishSockJsEvent(patient.getId(), quarentineObservation.getCode()
                .getText());
        return quarentineObservationMessage.ack();
    }

    private void publishSockJsEvent(Long patientId, String text) {
        eventBus.publish("monitor", MonitorEventDto.builder()
                .id(patientId)
                .message(" is on quarentine list by observation ." + text)
                .build());
    }
     }

Este segmento do sistema tem a tarefa de persistir as informações recebidas de Kafka, armazená-las nas observações do paciente no banco de dados e notificar a ocorrência ao monitor.

O monitor

Por fim, o monitor do sistema é responsável por fornecer uma visualização simples do front-end. Isso permite que os profissionais de saúde revisem os dados do paciente/exame e tomem as ações necessárias.

Implementação of langchain

Através do monitor, o sistema permite que os profissionais de saúde solicitem recomendações da IA ​​Generativa.

    @Unremovable
    @Slf4j
    @ApplicationScoped
    public class PatientRepository {
        @Tool("Get anamnesis information for a given patient id")
        public Patient getAnamenisis(Long patientId) {
            log.info("getAnamenisis called with id " + patientId);
            Patient patient = Patient.findById(patientId);
            return patient;
        }
    
        @Tool("Get the last clinical results for a given patient id")
        public List getObservations(Long patientId) {
            log.info("getObservations called with id " + patientId);
            Patient patient = Patient.findById(patientId);
            return patient.getObservationList();
        }
    
    }

segue implementação de langchain4j

    @RegisterAiService(chatMemoryProviderSupplier = RegisterAiService.BeanChatMemoryProviderSupplier.class, tools = {PatientRepository.class})
    public interface PatientAI {
    
        @SystemMessage("""
                You are a health care assistant AI. You have to recommend exams for patients based on history information.
                """)
        @UserMessage("""
                 Your task is to recommend clinical exams for the patient id {patientId}.
    
                 To complete this task, perform the following actions:
                 1 - Retrieve anamnesis information for patient id {patientId}.
                 2 - Retrieve the last clinical results for patient id {patientId}, using the property 'name' as the name of exam and 'value' as the value.
                 3 - Analyse results against well known conditions of health care.
    
                 Answer with a single JSON document containing:
                 - the patient id in the 'patientId' key
                 - the patient weight in the 'weight' key
                 - the exam recommendation list in the 'recommendations' key, with properties exam, reason and condition.
                 - the 'explanation' key containing an explanation of your answer, especially about well known diseases.
    
                Your response must be just the raw JSON document, without json, or anything else.
                 """)
        String recommendExams(Long patientId);
    }

Dessa forma, o sistema pode auxiliar os profissionais de saúde na tomada de decisões e na execução de ações.

Video demonstração 

VIDEO

Autores

NOTA: 

O aplicativo https://openexchange.intersystems.com/package/fhir-pex está participando atualmente do InterSystems Java Contest 2023. Sinta-se à vontade para explorar a solução e não hesite em entrar em contato se tiver alguma dúvida ou precisar de informações adicionais. Recomendamos executar o aplicativo em seu ambiente local para uma experiência prática.
Obrigado pela oportunidade 😀!

Artigo original: https://community.intersystems.com/post/sending-kafka-messages-java-pex-...

Discussão (0)1
Entre ou crie uma conta para continuar