NATS IO (High Performance Cloud Native Messaging)
Overview
- NATS is an open source messaging system
- NATS server is written in Go programming language
- Apcera develops and provide support for NATS (Developed by Derek Collision in Ruby earlier)
- The core design principle of NATS are Performance, Scalability and Easy of Use
- Security - Pluggable Integration with External Authorisation
- NATS Streaming Server setup
- Java Program - Publisher & Subscriber
- Integration With Spring Boot
- Testing with JMeter by Pushing millions of messages
- Scalability
NATS Streaming Server setup
-------------------------------------
1) Download the NATS Streaming server from https://nats.io/download/nats-io/nats-streaming-server/ (nats-streaming-server-v0.4.0-linux-386)
2) Unzip the file in your own directory
3) Open Terminal (Linux) and follow the path to get nats-streaming-server-v0.4.0.sh
4) Run nats-streaming-server-v0.4.0.sh
Java Program - Publisher & Subscriber
-------------------------------------------------
1) Write a Publisher class (Publisher.java)
2) Export the Runnable jar file [Publisher.jar]
3) Run the Jar [ java -jar Publisher.jar --server "Hello Shailesh Bhaskar..!"]
4) Write a Subscriber class (Subscriber.java)
5) Export the Runnable jar file(Subscriber.jar)
6) Run the Jar [ java -jar Subscriber.jar --server]
Note: Start the NATS server and Run the Subscriber first and then Publisher. You are done..
Publisher.java
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeoutException;
import io.nats.client.Connection;
import io.nats.client.ConnectionFactory;
public class Publisher {
public String url="nats://localhost:4222";
public ConnectionFactory cf = new ConnectionFactory(url);
String subject="alarm";
String payload="Helloooo";
String usageString =
"\nUsage: java OldPublisher [options]"+subject+"Hi message"+"\n\nOptions:\n"
+ " -s, --server <url> STAN server URL(s)\n";
Publisher(String[] args) throws IOException, TimeoutException {
System.out.println("URL===="+url);
System.out.println("args===="+args);
parseArgs(args);
if (subject == null) {
usage();
}
}
void usage() {
System.err.println(usageString);
System.exit(-1);
}
void run() throws IOException, TimeoutException {
// ConnectionFactory cf = new ConnectionFactory(url);
try (Connection nc = cf.createConnection()) {
nc.publish(subject, payload.getBytes());
System.err.printf("Published [%s] : '%s'\n", subject, payload);
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void parseArgs(String[] args) {
if (args == null || args.length < 2) {
usage();
return;
}
List<String> argList = new ArrayList<String>(Arrays.asList(args));
// The last arg should be subject and payload
// get the payload and remove it from args
payload = argList.remove(argList.size() - 1);
// get the subject and remove it from args
subject = argList.remove(argList.size() - 1);;
// Anything left is flags + args
Iterator<String> it = argList.iterator();
while (it.hasNext()) {
String arg = it.next();
switch (arg) {
case "-s":
case "--server":
if (!it.hasNext()) {
usage();
}
it.remove();
url = it.next();
it.remove();
continue;
default:
System.err.printf("Unexpected token: '%s'\n", arg);
usage();
break;
}
}
}
/**
* Publishes a message to a subject.
*
* @param args the subject, message payload, and other arguments
*/
public static void main(String[] args) {
try {
System.out.println("Started=========");
new Publisher(args).run();
System.out.println("Ended=========");
} catch (IOException | TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.exit(-1);
}
// System.exit(0);
}
}
Subscriber.java
public class Subscriber {
public String url="nats://192.168.1.66:4222";
public ConnectionFactory cf = new ConnectionFactory(url);
private String subject;
private String qgroup;
String usageString = "\nUsage: java Subscriber "+subject+"\n\nOptions:\n"
+ " -s, --server <url> NATS server URL(default: "
+ url + ")\n"
+ " -q, --qgroup <name> Queue group\n";
Subscriber(String[] args) {
parseArgs(args);
if (subject == null) {
usage();
}
}
void usage() {
System.err.println(usageString);
System.exit(-1);
}
void run() throws IOException, TimeoutException {
// ConnectionFactory cf = new ConnectionFactory(url);
try (final Connection nc = cf.createConnection()) {
// System.out.println("Connected successfully to " + cf.getNatsUrl());
final AtomicInteger count = new AtomicInteger();
try (final Subscription sub = nc.subscribe(subject, qgroup, new MessageHandler() {
@Override
public void onMessage(Message m) {
System.out.printf("[#%d] Received on [%s]: '%s'\n", count.incrementAndGet(),
m.getSubject(), m);
}
})) {
System.out.printf("Listening on [%s]\n", subject);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
System.err.println("\nCaught CTRL-C, shutting down gracefully...\n");
try {
sub.unsubscribe();
} catch (IOException e) {
// log.error("Problem unsubscribing", e);
System.out.println("Problem unsubscribing"+ e);
}
nc.close();
}
}));
while (true) {
// loop forever
}
}
}
}
private void parseArgs(String[] args) {
if (args == null || args.length < 1) {
usage();
return;
}
List<String> argList = new ArrayList<String>(Arrays.asList(args));
// The last arg should be subject
// get the subject and remove it from args
subject = argList.remove(argList.size() - 1);
// Anything left is flags + args
Iterator<String> it = argList.iterator();
while (it.hasNext()) {
String arg = it.next();
switch (arg) {
case "-s":
case "--server":
if (!it.hasNext()) {
usage();
}
it.remove();
url = it.next();
it.remove();
continue;
case "-q":
case "--qgroup":
if (!it.hasNext()) {
usage();
}
it.remove();
qgroup = it.next();
it.remove();
continue;
default:
System.err.printf("Unexpected token: '%s'\n", arg);
usage();
break;
}
}
}
/**
* Subscribes to a subject.
*
* @param args the subject, cluster info, and subscription options
*/
public static void main(String[] args) {
try {
new Subscriber(args).run();
} catch (IOException | TimeoutException e) {
// log.error("Couldn't create Subscriber", e);
System.out.println("Couldn't create Subscriber"+ e);
System.exit(-1);
}
System.exit(0);
}
Comments
Post a Comment