Get Started with KubeMQ: Deploy, Integrate, and Scale Seamlessly

1. Deploy KubeMQ

Run the following docker command

docker run -d -p 8080:8080 -p 50000:50000 -p 9090:9090 -e KUBEMQ_TOKEN={enter your token here} kubemq/kubemq

2. Install KubeMQ SDK:

   io.kubemq.sdk
   kubemq-sdk-Java
   LATEST
        io.kubemq.sdk
        kubemq-sdk-springboot
        LATEST
dotnet add package KubeMQ.SDK.csharp
pip install kubemq
go get github.com/kubemq-io/kubemq-go
npm install kubemq-js

3. Create a Simple Messaging Queue:

Below is a comprehensive example that demonstrates how to send and receive messages using KubeMQ’s queues. This example showcases:

1. Initialize a KubeMQ client

2. Send a message to a queue

3. Receive messages from a queue

4. Acknowledge received messages

package io.kubemq.example.queues;

import io.kubemq.sdk.common.ServerInfo;
import io.kubemq.sdk.queues.QueueMessage;
import io.kubemq.sdk.queues.QueueSendResult;
import io.kubemq.sdk.queues.QueuesClient;
import io.kubemq.sdk.queues.QueuesPollRequest;
import io.kubemq.sdk.queues.QueuesPollResponse;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;

/**
 * Example class demonstrating how to poll the queue messages using from KubeMQ
 * This class initializes the KubeMQClient
 * and QueuesClient, and handles the message polling, ack, requeue, reject.
 */
public class QueueExample {

    private final QueuesClient queuesClient;
    private final String channelName = "mytest-channel";
    private final String address = "localhost:50000";
    private final String clientId = "kubeMQClientId";

    public QueueExample() {
        // Create QueuesClient using the builder pattern
        queuesClient = QueuesClient.builder()
                  .address(address)
                .clientId(clientId)
                .build();
    }

    public void sendQueueMessage() {
        try {
            Map tags = new HashMap<>();
            tags.put("tag1", "kubemq");
            tags.put("tag2", "kubemq2");
            QueueMessage message = QueueMessage.builder()
                    .body("Sending data in queue message stream".getBytes())
                    .channel(channelName)
                    .metadata("Sample metadata")
                    .id(UUID.randomUUID().toString())
                    .build();

            QueueSendResult sendResult = queuesClient.sendQueuesMessage(message);

            System.out.println("Message sent Response:");
            System.out.println("ID: " + sendResult.getId());
            System.out.println("Sent At: " + sendResult.getSentAt());
            System.out.println("Expired At: " + sendResult.getExpiredAt());
            System.out.println("Delayed To: " + sendResult.getDelayedTo());
            System.out.println("Is Error: " + sendResult.isError());
            if (sendResult.isError()) {
                System.out.println("Error: " + sendResult.getError());
            }
        } catch (RuntimeException e) {
            System.err.println("Failed to send queue message: " + e.getMessage());
        }
    }

    public void receiveQueuesMessages() {
        try {
            QueuesPollRequest queuesPollRequest = QueuesPollRequest.builder()
                    .channel(channelName)
                    .pollMaxMessages(1)
                    .pollWaitTimeoutInSeconds(10)
                    .build();

            QueuesPollResponse pollResponse = queuesClient.receiveQueuesMessages(queuesPollRequest);

            System.out.println("Received Message Response:");
            System.out.println("RefRequestId: " + pollResponse.getRefRequestId());
            System.out.println("ReceiverClientId: " + pollResponse.getReceiverClientId());
            System.out.println("TransactionId: " + pollResponse.getTransactionId());

            if (pollResponse.isError()) {
                System.out.println("Error: " + pollResponse.getError());
            } else {
                pollResponse.getMessages().forEach(msg -> {
                    System.out.println("Message ID: " + msg.getId());
                    System.out.println("Message Body: " + new String(msg.getBody()));

                    // Message handling options:

                    // 1. Acknowledge message (mark as processed)
                    msg.ack();

                    // 2. Reject message (won't be requeued)
                    // msg.reject();

                    // 3. Requeue message (send back to queue)
                    // msg.reQueue(channelName);
                });
            }

        } catch (RuntimeException e) {
            System.err.println("Failed to receive queue messages: " + e.getMessage());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        QueueExample example = new QueueExample();
        System.out.println("Starting to send messages & Receive message from queue stream: " + new java.util.Date());
        example.sendQueueMessage();
        Thread.sleep(1000);
        example.receiveQueuesMessages();
        // Keep the main thread running to handle responses
        CountDownLatch latch = new CountDownLatch(1);
        latch.await();
    }
}
package io.kubemq.example;

import io.kubemq.sdk.queues.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Service;
import java.util.UUID;

@SpringBootApplication
@ComponentScan(basePackages = {"io.kubemq"})
public class QueuesExample implements CommandLineRunner {

    @Autowired
    private QueuesExampleService queuesExampleService;

    public static void main(String[] args) {
        SpringApplication.run(QueuesExample.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        queuesExampleService.sendQueueMessage("queue_channel");
        queuesExampleService.receiveQueuesMessages("queue_channel");
        Thread.sleep(1000);
        queuesExampleService.shutdown();
    }
}

@Service
class QueuesExampleService {

    @Autowired
    private QueuesClient queuesClient;

    public void sendQueueMessage(String channelName) {
        QueueMessage message = QueueMessage.builder()
                .body(("Sending data in queue message").getBytes())
                .channel(channelName)
                .metadata("metadata")
                .id(UUID.randomUUID().toString())
                .build();
        QueueSendResult sendResult = queuesClient.sendQueuesMessage(message);
        System.out.println("Message sent Response: " + sendResult);
    }

    public void receiveQueuesMessages(String channelName) {
        try {
            QueuesPollRequest queuesPollRequest = QueuesPollRequest.builder()
                    .channel(channelName)
                    .pollMaxMessages(1)
                    .pollWaitTimeoutInSeconds(10)
                    .build();

            QueuesPollResponse pollResponse = queuesClient.receiveQueuesMessages(queuesPollRequest);

            System.out.println("Received Message Response:");
            System.out.println("RefRequestId: " + pollResponse.getRefRequestId());
            System.out.println("ReceiverClientId: " + pollResponse.getReceiverClientId());
            System.out.println("TransactionId: " + pollResponse.getTransactionId());

            if (pollResponse.isError()) {
                System.out.println("Error: " + pollResponse.getError());
            } else {
                pollResponse.getMessages().forEach(msg -> {
                    System.out.println("Message ID: " + msg.getId());
                    System.out.println("Message Body: " + new String(msg.getBody()));
                    msg.ack();
                });
            }
        } catch (RuntimeException e) {
            System.err.println("Failed to receive queue messages: " + e.getMessage());
        }
    }

    public void shutdown() {
        try {
            queuesClient.close();
        } catch (RuntimeException e) {
            System.err.println("Failed to close KubeMQ client: " + e.getMessage());
        }
    }
}
using System.Text;
using KubeMQ.SDK.csharp.QueueStream;

namespace HelloWorld
{
    class Program
    {
        static async Task Main(string[] args)
        {
            QueueStream client = new QueueStream("localhost:50000", "Some-client-id");
            Message msg = new Message()
            {
                MessageID = "1",
                Queue = "hello_world",
                Body = "hello kubemq - sending an queue message"u8.ToArray(),
            };
            List messages = new List { msg };
            await client.Send(new SendRequest(messages));
            Thread.Sleep(1000);
            PollRequest pollRequest = new PollRequest()
            {
                Queue = "hello_world",
                WaitTimeout = 1000,
                MaxItems = 1,
            };
            PollResponse response = await client.Poll(pollRequest);
            foreach (var receiveMsg in response.Messages)
            {
                Console.WriteLine(Encoding.UTF8.GetString(receiveMsg.Body));
                receiveMsg.Ack();
            }
        }
    }
}
from kubemq.queues import *

def main():
    with Client(address="localhost:50000") as client:
        send_result = client.send_queues_message(
            QueueMessage(
                channel="q1",
                body=b"some-simple_queue-queue-message",
                metadata="some-metadata",
            )
        )
        print(f"Queue Message Sent: {send_result}")

        result = client.receive_queues_messages(
            channel="q1",
            max_messages=1,
            wait_timeout_in_seconds=10,
            auto_ack=False,
        )
        if result.is_error:
            print(f"{result.error}")
            return
        for message in result.messages:
            print(f"Id:{message.id}, Body:{message.body.decode('utf-8')}")
            message.ack()

if __name__ == "__main__":
    main()
package main

import (
  "context"
  "github.com/kubemq-io/kubemq-go/queues_stream"
  "log"
)

func main() {
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()
  queuesClient, err := queues_stream.NewQueuesStreamClient(ctx,
    queues_stream.WithAddress("localhost", 50000),
    queues_stream.WithClientId("example"))
  if err != nil {
    log.Fatal(err)
  }
  defer func() {
    err := queuesClient.Close()
    if err != nil {
      log.Fatal(err)
    }
  }()
  msg := queues_stream.NewQueueMessage().
    SetChannel("queues_example").
    SetBody([]byte("hellow world"))
  _, err = queuesClient.Send(ctx, msg)
  if err != nil {
    log.Fatal(err)
  }

  pollRequest := queues_stream.NewPollRequest().
    SetChannel("queues_example").
    SetMaxItems(1).
    SetWaitTimeout(1).
    SetAutoAck(false)
  result, err := queuesClient.Poll(ctx, pollRequest)
  if err != nil {
    log.Fatal(err)
  }
  log.Printf("Received Message: %s", string(result.Messages[0].Body))
  for _, msg := range result.Messages {
    if err := msg.Ack(); err != nil {
      log.Fatal(err)
    }
  }
}
import { Config, QueuesClient, QueuesPollRequest, Utils } from 'kubemq-js'

async function main() {
  const opts: Config = {
    address: 'localhost:50000',
    clientId: 'kubeMQClientId-ts',
  };
  const queuesClient = new QueuesClient(opts);

  //Send to single channel
  await queuesClient
    .sendQueuesMessage({
      channel: 'queues.single',
      body: Utils.stringToBytes('queue message'),
      policy: {expirationSeconds:3600, delaySeconds:1, maxReceiveCount:3, maxReceiveQueue: 'dlq-queues.single'},
    })
    .then((result) => console.log(result))
    .catch((reason) => console.error(reason));

  //Receive Queue Message
  const pollRequest = new QueuesPollRequest({
    channel: 'queues.single',
    pollMaxMessages: 1, // Maps to maxNumberOfMessages
    pollWaitTimeoutInSeconds: 10, // Maps to waitTimeoutSeconds
    autoAckMessages: false // Optional: add based on your needs
  });

  // Use the properties of QueuesPollRequest in the receiveQueuesMessages function
  await queuesClient
    .receiveQueuesMessages(pollRequest)
    .then((response) => {
      response.messages.forEach((msg) => {
        console.log(msg);
        msg.ack()
      });
    })
    .catch((reason) => {
      console.error(reason);
    });
}

main();

Next steps:

Congratulations! You’ve successfully installed KubeMQ, deployed your first messaging service, and explored sending and receiving messages. Now that you have the basics down, here are some next steps to help you dive deeper into KubeMQ and fully leverage its capabilities:

1. Explore Advanced Messaging Patterns

KubeMQ supports a wide range of messaging patterns, including Pub/Sub, Event Streaming, RPC, and more. Explore how you can implement these patterns in your applications to meet various business requirements.
KubeMQ Messaging Patterns

2. Expand Your Development with KubeMQ SDKs

To further enhance your applications, KubeMQ offers a range of SDKs that allow you to seamlessly integrate messaging capabilities across various programming languages. These SDKs provide the tools you need to build robust, scalable, and feature-rich solutions tailored to your development environment.

Java SDK Documentation

Spring Boot SDK Documentation

.NET SDK Documentation

Go SDK Documentation

Python SDK Documentation

Node.js SDK Documentation

3. Join the KubeMQ Community Slack

Connect with other KubeMQ users and developers to share knowledge, get help, and stay updated on the latest features and best practices.
Join the KubeMQ Community Slack

4. Get Professional Support

If you need additional help or want to optimize your KubeMQ deployment, consider reaching out for professional support. The KubeMQ team offers consulting services to help you design, implement, and scale your messaging infrastructure.
Contact KubeMQ professional support

Send a Message