Enviando mensagens para Kafka
Olá!
Eu estava querendo dedicar algum tempo para implementar alguma DLL ou algo que eu pudesse usar do Caché e finalmente tive uma pequena ideia, se você está interessado em poder produzir mensagens que são enviadas para o Kafka rapidamente, você é no lugar certo ;-)
Antes de lhe entregar a ficha com o que vamos ver, vou fazer um resumo para que você decida se tem interesse em ler o artigo.
Neste artigo vamos focar "apenas" na parte de produzir mensagens e enviá-las para Kafka:
.png)
Como funciona?
Utilizo uma DLL .Net (Netframework 4.5) que fiz (está dentro da pasta dll do repositório)
Depois através de 2 classes que criei (podem consultar o código fonte no repositório: aqui ):
- Kafka.Helper.cls (Sirve para configurar e instalar todo).
- Kafka.Producer.cls (usado para criar mensagens e enviá-las para Kafka).
Podemos enviar mensagens para Kafka de nossas aulas ou rotinas.
Exemplo:
SendObjectMessage() N topic, message, resSet topic="mitopic" Set message=##class(%ZEN.proxyObject).%New() Set message.nombre="Dani" Set message.direccion="C/Falsa, 123" Set res=##class(Kafka.Producer).sendObject(topic, .message) Q
Te interessa? Bem, vamos ver em detalhes:
.png)
A DLL .Net usa uma implementação Confluent para enviar mensagens para um tópico Kafka (que devemos ter gerado anteriormente, deixo o arquivo docker-compose que criei para os testes, você terá que modificar o endereço ip para que corresponda o ip do computador onde você o implanta), (o arquivo docker-compose pode ser melhorado, mas será útil para testes):
Fichero docker-compose
services:
zookeeper:
image: zookeeper
restart: always
container_name: zookeeper
hostname: zookeeper
ports:
- 2181:2181
volumes:
- /kafka/zookeeper_data:/zookeeper/data
- /kafka/zookeeper_logs:/zookeeper/logs
- /kafka/zookeeper_conf:/zookeeper/conf
environment:
ZOO_MY_ID: 1
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- 9092:9092
volumes:
- /kafka/data:/kafka/data
- /kafka/config:/kafka/config
environment:
KAFKA_ADVERTISED_HOST_NAME: 172.16.172.10
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
kafka_manager:
image: hlebalbau/kafka-manager:stable
container_name: kakfa-manager
restart: always
ports:
- "9000:9000"
environment:
ZK_HOSTS: "zookeeper:2181"
APPLICATION_SECRET: "random-secret"
command: -Dpidfile.path=/dev/null
Configuração em Cache e envio de mensagens:
Para defini-lo como Cache:
Iremos copiar o conteúdo da pasta dll para uma pasta no servidor onde temos o Caché instalado (copiei o que para c:\kafkaaqs ).
Em seguida, importaremos o pacote Kafka e abriremos a classe Kafka.Helper.cls.
Dentro dele vamos configurar os parâmetros de configuração do Gateway que executa a DLL
.png)
Si no sabes configurar el Gateway consulta este spoiler:
Configurar Gateway
.png)
Clique em Create New Gateway e preencha as informações (Cada configuração que corresponde a esta é um exemplo):
.png)
Uma vez configurado, executamos o método de instalação pelo terminal:
Do ##class(Kafka.Helper).install()
A seguir iremos configurar o tópico (Este tópico deve ser previamente definido no Kafa)
Definir topic:
Com o docker-compose iniciado:
docker-compose up -d
Abrimos o KafkaManager no navegador web (Também pode ser feito pelo terminal mas se você não estiver familiarizado com os comandos do Kafka, pode ser mais fácil para você dessa forma)
.png)
.png)
Preenchemos o nome do cluster e a configuração do Zookeepr (para a configuração do docker-compose, se você não tocou, é isso)
.png)
Depois de salvo, ele aparecerá em nossa lista de clusters, inserimos clicando no nome e passamos a criar nosso tópico:
.png)
No tópico teremos que atribuir: o nome, número de partições, fator de replicação... etc.
E pronto! Já temos nosso Tópico criado e pronto para começar a receber mensagens.
Criamos o tópico no cache:
New topic,server,keyIdSet topic = "mitopic"Set server = "172.16.172.5:9092"Set keyId = "miKey"Do ##class(Kafka.Helper).createTopic(topic,server,keyId)
(No servidor devemos colocar a porta e o ip configurados no docker-compose)
Os tópicos criados são salvos no ^KAFKA global
.png)
Sua estrutura é ^KAFA("TOPICS", nombreTopicX)
^KAFA("TOPICS", nombreTopicX,"keyId") = Id da chave enviado junto com as mensagens
^KAFA("TOPICS", nombreTopicX,"server") = ip kafka : puerto escucha
Podemos gerar quantos tópicos quisermos.
Assim que tivermos tudo configurado podemos finalmente começar a enviar mensagens !!
Eu implementei 2 métodos para fazer isso
1 - Envía un texto en formato JSON
2 - Envie um objeto %ZEN.proxyObject
3 - Não tive tempo mas se quiserem irei implementar a entrega com dynamicObjects e objetos de classes definidas também.
Opção 1 Enviar texto en formato JSON:
Set topic="mitopic"Set message="{""Valor"":""Ola""}"Set res=##class(Kafka.Producer).sendMessage(topic,message)
Opção 2 Enviar objeto %ZEN.proxyObject
Set topic="mitopic"Set message=##class(%ZEN.proxyObject).%New()Set message.nombre="Dani"Set message.direccion="C/Falsa, 123"Set res=##class(Kafka.Producer).sendObject(topic, .message)
Quando enviamos uma mensagem, se por acaso a comunicação falhar ou a mensagem não puder ser enviada por qualquer motivo, deixa o erro registrado em um subnível do ^KAFKA global dentro do tópico que falhou:
.png)
O que ele registra no nível "MESSAGE" é a mensagem no formato JSON (seja passada como objeto ou como texto)
O resultado retornado pela DLL .Net é gravado em resMessage.
Com isso já poderíamos enviar mensagens para o Kafka, obviamente algum assinante teria que estar configurado para os tópicos para que as mensagens que enviamos possam ser processadas e executar sua função.
O que eles poderiam ser? Qualquer um, envie um e-mail, SMS, gere um PDF, sincronize as informações recebidas com outro sistema... enfim, qualquer coisa que lhe vier à cabeça.
Pessoalmente, acho que é uma ferramenta muito, muito poderosa que nos permite desacoplar nossos desenvolvimentos e manter as lógicas separadas.
Espero que gostem da ideia, vou continuar desenvolvendo nas horas vagas então se tiverem alguma sugestão ou melhoria será muito bem vinda.
Obrigado por ler o artigo, e espero que tenha gostado.
Obrigado por ler!!.