Get Started with KubeMQ: Deploy, Integrate, and Scale Seamlessly
1. Deploy KubeMQ
Run the following docker command
docker run -d -p 8080:8080 -p 50000:50000 -p 9090:9090 -e KUBEMQ_TOKEN={enter your token here} kubemq/kubemq
3. Create a Simple Messaging Queue:
Below is a comprehensive example that demonstrates how to send and receive messages using KubeMQ’s queues. This example showcases:
1. Initialize a KubeMQ client
2. Send a message to a queue
3. Receive messages from a queue
4. Acknowledge received messages
package io.kubemq.example.queues; import io.kubemq.sdk.common.ServerInfo; import io.kubemq.sdk.queues.QueueMessage; import io.kubemq.sdk.queues.QueueSendResult; import io.kubemq.sdk.queues.QueuesClient; import io.kubemq.sdk.queues.QueuesPollRequest; import io.kubemq.sdk.queues.QueuesPollResponse; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; /** * Example class demonstrating how to poll the queue messages using from KubeMQ * This class initializes the KubeMQClient * and QueuesClient, and handles the message polling, ack, requeue, reject. */ public class QueueExample { private final QueuesClient queuesClient; private final String channelName = "mytest-channel"; private final String address = "localhost:50000"; private final String clientId = "kubeMQClientId"; public QueueExample() { // Create QueuesClient using the builder pattern queuesClient = QueuesClient.builder() .address(address) .clientId(clientId) .build(); } public void sendQueueMessage() { try { Maptags = new HashMap<>(); tags.put("tag1", "kubemq"); tags.put("tag2", "kubemq2"); QueueMessage message = QueueMessage.builder() .body("Sending data in queue message stream".getBytes()) .channel(channelName) .metadata("Sample metadata") .id(UUID.randomUUID().toString()) .build(); QueueSendResult sendResult = queuesClient.sendQueuesMessage(message); System.out.println("Message sent Response:"); System.out.println("ID: " + sendResult.getId()); System.out.println("Sent At: " + sendResult.getSentAt()); System.out.println("Expired At: " + sendResult.getExpiredAt()); System.out.println("Delayed To: " + sendResult.getDelayedTo()); System.out.println("Is Error: " + sendResult.isError()); if (sendResult.isError()) { System.out.println("Error: " + sendResult.getError()); } } catch (RuntimeException e) { System.err.println("Failed to send queue message: " + e.getMessage()); } } public void receiveQueuesMessages() { try { QueuesPollRequest queuesPollRequest = QueuesPollRequest.builder() .channel(channelName) .pollMaxMessages(1) .pollWaitTimeoutInSeconds(10) .build(); QueuesPollResponse pollResponse = queuesClient.receiveQueuesMessages(queuesPollRequest); System.out.println("Received Message Response:"); System.out.println("RefRequestId: " + pollResponse.getRefRequestId()); System.out.println("ReceiverClientId: " + pollResponse.getReceiverClientId()); System.out.println("TransactionId: " + pollResponse.getTransactionId()); if (pollResponse.isError()) { System.out.println("Error: " + pollResponse.getError()); } else { pollResponse.getMessages().forEach(msg -> { System.out.println("Message ID: " + msg.getId()); System.out.println("Message Body: " + new String(msg.getBody())); // Message handling options: // 1. Acknowledge message (mark as processed) msg.ack(); // 2. Reject message (won't be requeued) // msg.reject(); // 3. Requeue message (send back to queue) // msg.reQueue(channelName); }); } } catch (RuntimeException e) { System.err.println("Failed to receive queue messages: " + e.getMessage()); } } public static void main(String[] args) throws InterruptedException { QueueExample example = new QueueExample(); System.out.println("Starting to send messages & Receive message from queue stream: " + new java.util.Date()); example.sendQueueMessage(); Thread.sleep(1000); example.receiveQueuesMessages(); // Keep the main thread running to handle responses CountDownLatch latch = new CountDownLatch(1); latch.await(); } }
package io.kubemq.example; import io.kubemq.sdk.queues.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; import org.springframework.stereotype.Service; import java.util.UUID; @SpringBootApplication @ComponentScan(basePackages = {"io.kubemq"}) public class QueuesExample implements CommandLineRunner { @Autowired private QueuesExampleService queuesExampleService; public static void main(String[] args) { SpringApplication.run(QueuesExample.class, args); } @Override public void run(String... args) throws Exception { queuesExampleService.sendQueueMessage("queue_channel"); queuesExampleService.receiveQueuesMessages("queue_channel"); Thread.sleep(1000); queuesExampleService.shutdown(); } } @Service class QueuesExampleService { @Autowired private QueuesClient queuesClient; public void sendQueueMessage(String channelName) { QueueMessage message = QueueMessage.builder() .body(("Sending data in queue message").getBytes()) .channel(channelName) .metadata("metadata") .id(UUID.randomUUID().toString()) .build(); QueueSendResult sendResult = queuesClient.sendQueuesMessage(message); System.out.println("Message sent Response: " + sendResult); } public void receiveQueuesMessages(String channelName) { try { QueuesPollRequest queuesPollRequest = QueuesPollRequest.builder() .channel(channelName) .pollMaxMessages(1) .pollWaitTimeoutInSeconds(10) .build(); QueuesPollResponse pollResponse = queuesClient.receiveQueuesMessages(queuesPollRequest); System.out.println("Received Message Response:"); System.out.println("RefRequestId: " + pollResponse.getRefRequestId()); System.out.println("ReceiverClientId: " + pollResponse.getReceiverClientId()); System.out.println("TransactionId: " + pollResponse.getTransactionId()); if (pollResponse.isError()) { System.out.println("Error: " + pollResponse.getError()); } else { pollResponse.getMessages().forEach(msg -> { System.out.println("Message ID: " + msg.getId()); System.out.println("Message Body: " + new String(msg.getBody())); msg.ack(); }); } } catch (RuntimeException e) { System.err.println("Failed to receive queue messages: " + e.getMessage()); } } public void shutdown() { try { queuesClient.close(); } catch (RuntimeException e) { System.err.println("Failed to close KubeMQ client: " + e.getMessage()); } } }
using System.Text; using KubeMQ.SDK.csharp.QueueStream; namespace HelloWorld { class Program { static async Task Main(string[] args) { QueueStream client = new QueueStream("localhost:50000", "Some-client-id"); Message msg = new Message() { MessageID = "1", Queue = "hello_world", Body = "hello kubemq - sending an queue message"u8.ToArray(), }; List messages = new List { msg }; await client.Send(new SendRequest(messages)); Thread.Sleep(1000); PollRequest pollRequest = new PollRequest() { Queue = "hello_world", WaitTimeout = 1000, MaxItems = 1, }; PollResponse response = await client.Poll(pollRequest); foreach (var receiveMsg in response.Messages) { Console.WriteLine(Encoding.UTF8.GetString(receiveMsg.Body)); receiveMsg.Ack(); } } } }
from kubemq.queues import * def main(): with Client(address="localhost:50000") as client: send_result = client.send_queues_message( QueueMessage( channel="q1", body=b"some-simple_queue-queue-message", metadata="some-metadata", ) ) print(f"Queue Message Sent: {send_result}") result = client.receive_queues_messages( channel="q1", max_messages=1, wait_timeout_in_seconds=10, auto_ack=False, ) if result.is_error: print(f"{result.error}") return for message in result.messages: print(f"Id:{message.id}, Body:{message.body.decode('utf-8')}") message.ack() if __name__ == "__main__": main()
package main import ( "context" "github.com/kubemq-io/kubemq-go/queues_stream" "log" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() queuesClient, err := queues_stream.NewQueuesStreamClient(ctx, queues_stream.WithAddress("localhost", 50000), queues_stream.WithClientId("example")) if err != nil { log.Fatal(err) } defer func() { err := queuesClient.Close() if err != nil { log.Fatal(err) } }() msg := queues_stream.NewQueueMessage(). SetChannel("queues_example"). SetBody([]byte("hellow world")) _, err = queuesClient.Send(ctx, msg) if err != nil { log.Fatal(err) } pollRequest := queues_stream.NewPollRequest(). SetChannel("queues_example"). SetMaxItems(1). SetWaitTimeout(1). SetAutoAck(false) result, err := queuesClient.Poll(ctx, pollRequest) if err != nil { log.Fatal(err) } log.Printf("Received Message: %s", string(result.Messages[0].Body)) for _, msg := range result.Messages { if err := msg.Ack(); err != nil { log.Fatal(err) } } }
import {QueuesClient} from 'kubemq-js/lib/queues' import {Config} from 'kubemq-js/lib/config' import {Utils} from "kubemq-js"; async function main() { const opts: Config = { address: 'localhost:50000', clientId: Utils.uuid(), }; const queuesClient = new QueuesClient(opts); await queuesClient .send({ channel: 'queues.single', body: Utils.stringToBytes('queue message'), }) .then((result) => console.log(result)) .catch((reason) => console.error(reason)); queuesClient .subscribe( { channel: 'queues.single', maxNumberOfMessages: 1, waitTimeoutSeconds: 5, }, (err, response) => { if (err) { console.log(err); return; } response.messages.forEach((msg) => { console.log(msg); }); }, ) .then(async (resp) => { await new Promise((r) => setTimeout(r, 500000)); resp.unsubscribe(); }); } main();
4. Next steps:
Congratulations! You’ve successfully installed KubeMQ, deployed your first messaging service, and explored sending and receiving messages. Now that you have the basics down, here are some next steps to help you dive deeper into KubeMQ and fully leverage its capabilities:
1. Explore Advanced Messaging Patterns
KubeMQ supports a wide range of messaging patterns, including Pub/Sub, Event Streaming, RPC, and more. Explore how you can implement these patterns in your applications to meet various business requirements.
KubeMQ Messaging Patterns
2. Expand Your Development with KubeMQ SDKs
To further enhance your applications, KubeMQ offers a range of SDKs that allow you to seamlessly integrate messaging capabilities across various programming languages. These SDKs provide the tools you need to build robust, scalable, and feature-rich solutions tailored to your development environment.
3. Join the KubeMQ Community Slack
Connect with other KubeMQ users and developers to share knowledge, get help, and stay updated on the latest features and best practices.
Join the KubeMQ Community Slack
4. Get Professional Support
If you need additional help or want to optimize your KubeMQ deployment, consider reaching out for professional support. The KubeMQ team offers consulting services to help you design, implement, and scale your messaging infrastructure.
Contact KubeMQ professional support