NATS IO (High Performance Cloud Native Messaging)

Overview

  • NATS is an open source messaging system
  • NATS server is written in Go programming language
  • Apcera develops and provide support for NATS (Developed by Derek Collision in Ruby earlier)
  • The core design principle of NATS are Performance, Scalability and Easy of Use
  • Security - Pluggable Integration with External Authorisation
Required Activities

  • NATS Streaming Server setup
  • Java Program - Publisher & Subscriber
  • Integration With Spring Boot
  • Testing with JMeter by Pushing millions of messages
  • Scalability


NATS Streaming Server setup
-------------------------------------

1) Download the NATS Streaming server from https://nats.io/download/nats-io/nats-streaming-server/ (nats-streaming-server-v0.4.0-linux-386)

2) Unzip the file in your own directory

3) Open Terminal (Linux) and follow the path to get nats-streaming-server-v0.4.0.sh

4) Run nats-streaming-server-v0.4.0.sh 


Java Program - Publisher & Subscriber
-------------------------------------------------

1) Write a Publisher class (Publisher.java)
2) Export the Runnable jar file [Publisher.jar]
3) Run the Jar [ java -jar Publisher.jar --server "Hello Shailesh Bhaskar..!"]
4) Write a Subscriber class (Subscriber.java)
5) Export the Runnable jar file(Subscriber.jar)
6) Run the Jar [ java -jar Subscriber.jar --server]

Note: Start the NATS server and Run the Subscriber first and then Publisher. You are done..

Publisher.java

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeoutException;

import io.nats.client.Connection;
import io.nats.client.ConnectionFactory;

public class Publisher {
public String url="nats://localhost:4222";
public ConnectionFactory cf = new ConnectionFactory(url);
    String subject="alarm";
    String payload="Helloooo";

    String usageString =
            "\nUsage: java OldPublisher [options]"+subject+"Hi message"+"\n\nOptions:\n"
                    + "    -s, --server   <url>            STAN server URL(s)\n";

    Publisher(String[] args) throws IOException, TimeoutException {
    System.out.println("URL===="+url);
    System.out.println("args===="+args);
        parseArgs(args);
        if (subject == null) {
            usage();
        }
    }

    void usage() {
        System.err.println(usageString);
        System.exit(-1);
    }

    void run() throws IOException, TimeoutException {
    
       // ConnectionFactory cf = new ConnectionFactory(url);       
        try (Connection nc = cf.createConnection()) {
            nc.publish(subject, payload.getBytes());
            System.err.printf("Published [%s] : '%s'\n", subject, payload);
        } catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
    }

    private void parseArgs(String[] args) {
        if (args == null || args.length < 2) {
            usage();
            return;
        }

        List<String> argList = new ArrayList<String>(Arrays.asList(args));

        // The last arg should be subject and payload
        // get the payload and remove it from args
        payload = argList.remove(argList.size() - 1);

        // get the subject and remove it from args
        subject = argList.remove(argList.size() - 1);;

        // Anything left is flags + args
        Iterator<String> it = argList.iterator();
        while (it.hasNext()) {
            String arg = it.next();
            switch (arg) {
                case "-s":
                case "--server":
                    if (!it.hasNext()) {
                        usage();
                    }
                    it.remove();
                    url = it.next();
                    it.remove();
                    continue;
                default:
                    System.err.printf("Unexpected token: '%s'\n", arg);
                    usage();
                    break;
            }
        }
    }

    /**
     * Publishes a message to a subject.
     * 
     * @param args the subject, message payload, and other arguments
     */
    public static void main(String[] args) {
        try {
        System.out.println("Started=========");
            new Publisher(args).run();
            System.out.println("Ended=========");
        } catch (IOException | TimeoutException e) {       
            // TODO Auto-generated catch block
            e.printStackTrace();
            System.exit(-1);
        }
       // System.exit(0);
    }

}


Subscriber.java

public class Subscriber {
public String url="nats://192.168.1.66:4222";
public ConnectionFactory cf = new ConnectionFactory(url);
    
    private String subject;
    private String qgroup;

    String usageString = "\nUsage: java Subscriber "+subject+"\n\nOptions:\n"
            + "    -s, --server   <url>            NATS server URL(default: "
            + url + ")\n"
            + "    -q, --qgroup   <name>           Queue group\n";    
   
    Subscriber(String[] args) {
        parseArgs(args);
        if (subject == null) {
            usage();
        }
    }

    void usage() {
        System.err.println(usageString);
        System.exit(-1);
    }

    void run() throws IOException, TimeoutException {
       // ConnectionFactory cf = new ConnectionFactory(url);

        try (final Connection nc = cf.createConnection()) {
            // System.out.println("Connected successfully to " + cf.getNatsUrl());
            final AtomicInteger count = new AtomicInteger();
            try (final Subscription sub = nc.subscribe(subject, qgroup, new MessageHandler() {
                @Override
                public void onMessage(Message m) {
                    System.out.printf("[#%d] Received on [%s]: '%s'\n", count.incrementAndGet(),
                            m.getSubject(), m);
                }
            })) {
                System.out.printf("Listening on [%s]\n", subject);
                Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                    @Override
                    public void run() {
                        System.err.println("\nCaught CTRL-C, shutting down gracefully...\n");
                        try {
                            sub.unsubscribe();
                        } catch (IOException e) {
                           // log.error("Problem unsubscribing", e);
                        System.out.println("Problem unsubscribing"+ e);
                        }
                        nc.close();
                    }
                }));
                while (true) {
                    // loop forever
                }
            }
        }
    }

    private void parseArgs(String[] args) {
        if (args == null || args.length < 1) {
            usage();
            return;
        }

        List<String> argList = new ArrayList<String>(Arrays.asList(args));

        // The last arg should be subject
        // get the subject and remove it from args
        subject = argList.remove(argList.size() - 1);

        // Anything left is flags + args
        Iterator<String> it = argList.iterator();
        while (it.hasNext()) {
            String arg = it.next();
            switch (arg) {
                case "-s":
                case "--server":
                    if (!it.hasNext()) {
                        usage();
                    }
                    it.remove();
                    url = it.next();
                    it.remove();
                    continue;
                case "-q":
                case "--qgroup":
                    if (!it.hasNext()) {
                        usage();
                    }
                    it.remove();
                    qgroup = it.next();
                    it.remove();
                    continue;
                default:
                    System.err.printf("Unexpected token: '%s'\n", arg);
                    usage();
                    break;
            }
        }
    }

    /**
     * Subscribes to a subject.
     * 
     * @param args the subject, cluster info, and subscription options
     */
    public static void main(String[] args) {
        try {
            new Subscriber(args).run();
        } catch (IOException | TimeoutException e) {
           // log.error("Couldn't create Subscriber", e);
        System.out.println("Couldn't create Subscriber"+ e);
            System.exit(-1);
        }
        System.exit(0);
    }

}

Reference - https://nats.io/

Comments

Popular posts from this blog

Java Coding Best Practices

Docker to ELK