Artigo
· jan 4, 2024 8min de leitura

Enviando mensagens do Kafka via JAVA PEX para processamento de prognósticos de exames de quarentena

Introdução

Este artigo busca explorar o funcionamento e desenvolvimento do sistema FHIR-PEX, aproveitando os recursos do InterSystems IRIS.

Otimizando a identificação e o processamento de exames médicos nos centros de diagnóstico clínico, nosso sistema visa aumentar a eficiência e precisão dos fluxos de trabalho de saúde. Ao integrar os padrões FHIR ao banco de dados Java-PEX do InterSystems IRIS, o sistema ajuda os profissionais de saúde com recursos de validação e roteamento, melhorando, em última análise, a tomada de decisões e o cuidado dos pacientes.

como funciona

  • Interoperabilidade IRIS:
    Recebe mensagens no padrão FHIR, garantindo a integração e a compatibilidade com os dados da saúde.

  • Processamento de informações com "PEX Java":
    Processa mensagens no formato FHIR e as direciona a tópicos do Kafka com base nas regras configuradas globalmente no banco de dados, possibilitando um processamento e roteamento de dados eficiente, especialmente para exames direcionados à quarentena.

  • Tratamento de retornos do Kafka via back-end Java externo:
    Interpreta apenas os exames direcionados à quarentena, permitindo que o sistema trate os retornos do Kafka por um back-end Java externo. Facilita a geração de insights prognósticos para profissionais da saúde através de IA Generativa, contando com consultas de resultados de exames anteriores dos respectivos pacientes.

Desenvolvimento

Através do PEX (Production EXtension) da InterSystems, ferramenta de extensibilidade que permite aprimoramento e personalização do comportamento do sistema, elaboramos uma Operação de Negócios. Este componente tem a tarefa de processar mensagens recebidas no formato FHIR dentro do sistema. Veja este exemplo:

import com.intersystems.enslib.pex.*;
import com.intersystems.jdbc.IRISObject;
import com.intersystems.jdbc.IRIS;
import com.intersystems.jdbc.IRISList;
import com.intersystems.gateway.GatewayContext;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.*;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

public class KafkaOperation extends BusinessOperation {
// Conexão ao InterSystems IRIS
private IRIS iris;

// Conexão ao Kafka
private Producer<Long, String> producer;

// Endereço do servidor do Kafka (se forem vários, separados por vírgula)
public String SERVERS;

// Nome do nosso Produtor
public String CLIENTID;

/// Caminho para o arquivo Config
public String CONFIG;

public void OnInit() throws Exception {
[...]
}

public void OnTearDown() throws Exception {
[...]
}

public Object OnMessage(Object request) throws Exception {
    IRISObject req = (IRISObject) request;
    LOGINFO("Received object: " + req.invokeString("%ClassName", 1));

    // Crie o registro
    String value = req.getString("Text");
    String topic = getTopicPush(req);
    final ProducerRecord<Long, String> record = new ProducerRecord<>(topic, value);

    // Envie o novo registro
    RecordMetadata metadata = producer.send(record).get();

    // Retorne as informações do regitro
    IRISObject response = (IRISObject)(iris.classMethodObject("Ens.StringContainer","%New",topic+"|"+metadata.offset()));
    return response;
}

private Producer<Long, String> createProducer() throws IOException {
[...]
}

private String getTopicPush(IRISObject req) {
[...]
}

[...]
}

`

Dentro da aplicação, o método getTopicPush assume a responsabilidade de identificar o tópico para o qual a mensagem será enviada.

A determinação do tópico a que a mensagem será enviada depende da existência de uma regra na global "quarantineRule", conforme lido no IRIS.

String code = FHIRcoding.path("code").asText();
String system = FHIRcoding.path("system").asText();

IRISList quarantineRule = iris.getIRISList("quarantineRule",code,system);

 String reference = quarantineRule.getString(1);
 String value = quarantineRule.getString(2);

 String observationValue = fhir.path("valueQuantity").path("value").asText()

Quando há a global ^quarantineRule, a validação do objeto FHIR pode ser validada.

private boolean quarantineValueQuantity(String reference, String value, String observationValue) {
    LOGINFO("quarantine rule reference/value: " + reference + "/" + value);
    double numericValue = Double.parseDouble(value);
    double numericObservationValue = Double.parseDouble(observationValue);

    if ("<".equals(reference)) {
        return numericObservationValue < numericValue;
    }
    else if (">".equals(reference)) {
        return numericObservationValue > numericValue;
    }
    else if ("<=".equals(reference)) {
        return numericObservationValue <= numericValue;
    }
    else if (">=".equals(reference)) {
        return numericObservationValue >= numericValue;
    }

    return false;
}

Exemplo prático:

Ao definir uma global, como:

Set ^quarantineRule("59462-2","http://loinc.org") = $LB(">","500") 

Isso estabelece uma regra para o código "59462-2" e o sistema ""http://loinc.org"" na global ^quarantineRule, especificando uma condição em que o valor é definido como quarentena quando ultrapassa 500. No aplicativo, o método getTopicPush pode então usar essa regra para determinar o tópico apropriado para enviar a mensagem com base no resultado da validação.

Dada a atribuição, o JSON abaixo seria enviado para quarentena, porque corresponde à condição especificada por ter:

 {
          "system": "http://loinc.org",
          "code": "59462-2",
          "display": "Testosterone"
}

"valueQuantity": { "value": 550, "unit": "ng/dL", "system": "http://unitsofmeasure.org", "code": "ng/dL" }

Observação FHIR:

{
    "resourceType": "Observation",
    "id": "3a8c7d54-1a2b-4c8f-b54a-3d2a7efc98c9",
    "status": "final",
    "category": [
      {
        "coding": [
          {
            "system": "http://terminology.hl7.org/CodeSystem/observation-category",
            "code": "laboratory",
            "display": "laboratory"
          }
        ]
      }
    ],
    "code": {
      "coding": [
        {
          "system": "http://loinc.org",
          "code": "59462-2",
          "display": "Testosterone"
        }
      ],
      "text": "Testosterone"
    },
    "subject": {
      "reference": "urn:uuid:274f5452-2a39-44c4-a7cb-f36de467762e"
    },
    "encounter": {
      "reference": "urn:uuid:100b4a8f-5c14-4192-a78f-7276abdc4bc3"
    },
    "effectiveDateTime": "2022-05-15T08:45:00+00:00",
    "issued": "2022-05-15T08:45:00.123+00:00",
    "valueQuantity": {
      "value": 550,
      "unit": "ng/dL",
      "system": "http://unitsofmeasure.org",
      "code": "ng/dL"
    }
}

O aplicativo Quarkus Java

Após o envio para o tópico desejado, um aplicativo Quarkus Java foi criado para receber exames em quarentena.

@ApplicationScoped
 public class QuarentineObservationEventListener {

@Inject
PatientService patientService;

@Inject
EventBus eventBus;

@Transactional
@Incoming("observation_quarantine")
public CompletionStage<Void> onIncomingMessage(Message<QuarentineObservation> quarentineObservationMessage) {
    var quarentineObservation = quarentineObservationMessage.getPayload();
    var patientId = quarentineObservation.getSubject()
            .getReference();
    var patient = patientService.addObservation(patientId, quarentineObservation);
    publishSockJsEvent(patient.getId(), quarentineObservation.getCode()
            .getText());
    return quarentineObservationMessage.ack();
}

private void publishSockJsEvent(Long patientId, String text) {
    eventBus.publish("monitor", MonitorEventDto.builder()
            .id(patientId)
            .message(" is on quarentine list by observation ." + text)
            .build());
}
 }

Esse segmento do sistema tem a tarefa de persistir as informações recebidas de Kafka, armazená-las nas observações do paciente no banco de dados e notificar a ocorrência ao monitor.

O monitor

Por fim, o monitor do sistema é responsável por fornecer uma visualização simples do front-end. Isso permite que os profissionais de saúde revisem os dados do paciente/exame e realizem as ações necessárias.

Implementação de langchainPT

Através do monitor, o sistema permite que os profissionais de saúde solicitem recomendações da IA ​​Generativa.

@Unremovable
@Slf4j
@ApplicationScoped
public class PatientRepository {
    @Tool("Get anamnesis information for a given patient id")
    public Patient getAnamenisis(Long patientId) {
        log.info("getAnamenisis called with id " + patientId);
        Patient patient = Patient.findById(patientId);
        return patient;
    }

    @Tool("Get the last clinical results for a given patient id")
    public List<Observation> getObservations(Long patientId) {
        log.info("getObservations called with id " + patientId);
        Patient patient = Patient.findById(patientId);
        return patient.getObservationList();
    }

}

segue implementação de langchain4j

@RegisterAiService(chatMemoryProviderSupplier = RegisterAiService.BeanChatMemoryProviderSupplier.class, tools = {PatientRepository.class})
public interface PatientAI {

    @SystemMessage("""
            You are a health care assistant AI. You have to recommend exams for patients based on history information.
            """)
    @UserMessage("""
             Your task is to recommend clinical exams for the patient id {patientId}.

             To complete this task, perform the following actions:
             1 - Retrieve anamnesis information for patient id {patientId}.
             2 - Retrieve the last clinical results for patient id {patientId}, using the property 'name' as the name of exam and 'value' as the value.
             3 - Analyse results against well known conditions of health care.

             Answer with a **single** JSON document containing:
             - the patient id in the 'patientId' key
             - the patient weight in the 'weight' key
             - the exam recommendation list in the 'recommendations' key, with properties exam, reason and condition.
             - the 'explanation' key containing an explanation of your answer, especially about well known diseases.

            Your response must be just the raw JSON document, without ```json, ``` or anything else.
             """)
    String recommendExams(Long patientId);
}

Dessa forma, o sistema pode auxiliar os profissionais de saúde a tomar decisões e realizar ações.

Vídeo de demonstração

VÍDEO

Autores

OBSERVAÇÃO:

O aplicativo https://openexchange.intersystems.com/package/fhir-pex está participando atualmente do InterSystems Java Contest 2023. Sinta-se à vontade para explorar a solução e não hesite em entrar em contato se tiver alguma dúvida ou precisar de informações adicionais. Recomendamos executar o aplicativo em seu ambiente local para uma experiência prática.
Obrigado pela oportunidade 😀!

Discussão (0)1
Entre ou crie uma conta para continuar