4. Subscriptions

Messages published to a topic on an AMPS server are available to other clients via a subscription. Before messages can be received, a client must subscribe to one or more topics on the AMPS server so that the server will begin sending messages to the client. The server will continue sending messages to the client until the client unsubscribes, or the client disconnects. With content filtering, the AMPS server will limit the messages sent to only those messages that match a client-supplied filter. In this chapter, you will learn how to subscribe, unsubscribe, and supply filters for messages using the AMPS JavaScript client.

Subscribing to a Topic

The AMPS client makes it simple to subscribe to a topic. You call Client.subscribe() with the message handler, the topic to subscribe to and the parameters for the subscription. The client submits the subscription to AMPS and returns a Promise object that resolves with the subscription identifier. Received messages are asynchronously delivered to the message handler function. Here is a short example:

// let's create a Client that is properly connected to an AMPS server.
var client = new amps.Client('test');

client.connect('wss://127.0.0.1:9007/amps/json').then(function() {
    /**
    * Here we subscribe to the topic messages. We do not provide
    * a filter, so AMPS does not content-filter the topic.
    * We return the Promise object here, so that .then()
    * methods can be properly chained.
    */
    return client.subscribe(
        /**
        * Within this handler, we process messages. In this case,
        * we simply print the contents of the message.
        */
        function(message) { console.log(message.data); },

        // The topic
        'messages'
    );
}).then(function(subId) { console.log('subId: ', subId); });

Example 4.1: Basic AMPS subscription

AMPS creates an asynchronous subscription that receives messages and calls the message handler only if there’s a new message. This means that the client application as a whole can continue to receive messages while you are doing processing work, by stacking them in the execution queue.

The simple method described above is provided for convenience. The AMPS JavaScript client provides convenience methods for the most common form of AMPS commands. The client also provides an interface that allows you to have precise control over the command. Using that interface, the example above becomes:

// let's create a Client that is properly connected to an AMPS server.
var client = new amps.Client('test');

client.connect('wss://127.0.0.1:9007/amps/json').then(function() {
    /**
    * We execute the Command using the execute() method.
    * It returns the Promise object here, so that .then()
    * methods can be properly chained.
    */
    return client.execute(
        /**
        * Here we create a Command object for the subscribe
        * command, specifying the topic messages. We do not
        * provide a filter, so AMPS does not content-filter
        * the topic.
        */
        new amps.Command('subscribe').topic('messages'),

        /**
        * Within this handler, we process messages. In this case,
        * we simply print the contents of the message.
        */
        function(message) { console.log(message.data); }
    );
}).then(function(subId) { console.log('subId: ', subId); });

Example 4.2: Subscribing to a topic using a command

The Command interface allows you to precisely customize the commands you send to AMPS. For flexibility and ease of maintenance, 60East recommends using the Command interface (rather than a named method) for any command that will receive messages from AMPS. For publishing messages, there can be a slight performance advantage to using the named commands where possible.

Unsubscribing

With asynchronous message processing, when a subscription is successfully made messages will begin flowing to the message handler function and the Client.subscribe() call returns a Promise object that resolves with a unique string that serves as the identifier for this subscription. A Client can have any number of active subscriptions, and this string is used to refer to the particular subscription we have made here. For example, to unsubscribe, we simply pass in this identifier:

var client = new amps.Client('exampleClient');

client
    .connect('wss://localhost:9000/amps/json')
    .then(function() {
        return client.subscribe(onMessagePrinter, 'message');
    })
    .then(function(subId) {
        // when the program is done with the subscription, unsubscribe
        return client.unsubscribe(subId);
    })
    .then(function() { console.log('Unsubscribed'); });

Example 4.3: Unsubscribing from a topic

In this example, as in the previous section, we use the Client.subscribe() method to create a subscription to the messages topic. When our application is done listening to this topic, it unsubscribes by passing in the subId passed from the successfully resolved Promise of subscribe(). After the subscription is removed, no more messages will flow into our onMessagePrinter function.

AMPS also accepts a comma-delimited list of subscription identifiers to the unsubscribe() method, or the keyword all to unsubscribe all subscriptions for the client.

Understanding Promises and Message Handlers

The first time a command causes an instance of the Client to connect to AMPS, the client creates a WebSocket connection that runs asynchronously. This asynchronous connection is responsible for processing incoming messages from AMPS, which includes both messages that contain data and acknowledgments from the server.

When you execute a command on the AMPS client, the execute() method creates and returns a Promise object that typically waits for an acknowledgment from the server and then resolves with the id of the command. (The exception to this is Client.publish(). For performance, the publish command does not wait for an acknowledgment from the server before returning.)

Message handlers provided for message processing must be aware of the following considerations.

For maximum performance, do as little work in the message handler as possible. For example, if you use the contents of the message to perform an extensive calculation, a message handler that passes data into a WebWorker instance will typically perform better than a message handler that does this calculation in place.

While your message handler is running, the connection that calls your message handler is no longer receiving messages. This makes it easier to write a message handler, because you know that no other messages are arriving from the same subscription. However, this also means that you cannot use the same client that called the message handler to send commands to AMPS. Acknowledgments from AMPS cannot be processed, and your application will block while waiting for the acknowledgment. Instead, enqueue the command in a work queue to be processed by a separate worker, or use a different client object to submit the commands.

Understanding Messages and Commands

So far, we have seen that subscribing to a topic involves working with objects of amps.Message type. A Message represents a single message from an AMPS server, while an amps.Command is sent to a server. Commands are sent and messages are received for every client/server operation in AMPS.

Header Properties

There are two parts of each message in AMPS: a set of headers that provide metadata for the message, and the data that the message contains. Every AMPS message has one or more header fields defined. The precise headers present depend on the type and context of the message. There are many possible fields in any given message, but only a few are used for any given message. For each header field, the Message object contains a distinct method that allows for retrieval of that field. For example, the Message.header.commandId() corresponds to the CommandId header field, the Message.header.batchSize() corresponds to the BatchSize header field, and so on. For more information on these header fields, consult the AMPS User Guide and AMPS Command Reference.

Message class represents messages received from an AMPS server. When creating a message to be sent, the Command class is used. To work with header fields, a Command contains a set of <propertyName>() methods, which work as both setters and getters, allowing to chain command properties when creating a new command.

Message Data

Received message data is contained in the Message.data property. The data property will contain the parsed data of the message.

The AMPS JavaScript client contains a collection of helper classes for working with message types that are specific to AMPS (for example, JSON, FIX, NVFIX, and AMPS composite message types). You can replace default parsers to implement required specific behavior, as well as add new helpers.

Advanced Messaging Support

AMPS allows your application to subscribe to topics even when you do not know their exact names, and for providing a filter that works on the server to limit the messages received by your application.

Regex Subscriptions

Regular Expression (Regex) subscriptions allow a regular expression to be supplied in the place of a topic name. When you supply a regular expression, it is as if a subscription is made to every topic that matches your expression, including topics that do not yet exist at the time of creating the subscription.

To use a regular expression, simply supply the regular expression in place of the topic name in the subscribe() call. For example:

client.subscribe(function(message) { ... }, 'orders.*');

Example 4.4: Regex topic subscription

In this example, messages on topics orders-north-america and orders-europe would match the regular expression, and those messages would all be sent to the message handler function. As in the example, you can use the message.t property to determine the actual topic of the message sent to the function.

Content filtering

One of the most powerful features of AMPS is content filtering. With content filtering, filters based on message content are applied at the server, so that your application and the network are not utilized by messages that are uninteresting for your application. For example, if your application is only displaying messages from a particular user, you can send a content filter to the server so that only messages from that particular user are sent to the client.

To apply a content filter to a subscription, simply pass it into the Client.subscribe() call or use the filter() method to add a filter to the Command:

client.subscribe(
    function(msg) { console.log('Mom says: ', msg.data); },
    'letters',         // Topic
    '/sender="mom"',   // Filter
    5000               // Timeout (optional)
);

Example 4.5: Using content filtering

In this example, we have passed in a content filter /sender = 'mom'. This will cause the server to only send us messages from the messages topic that additionally have a sender field equal to mom.

For example, the AMPS server will send the following message, where /sender is mom:

{
    "sender" : "mom",
    "text" : "Happy Birthday!",
    "reminder" : "Call me Thursday!"
}

The AMPS server will not send a message with a different /sender value:

{
    "sender" : "henry dave",
    "text" : "Things do not change; we change."
}

Updating the Filter on a Subscribe

AMPS allows you to update the filter on a subscription. When you replace a filter on the subscription, AMPS immediately begins sending only messages that match the updated filter. Notice that if the subscription was entered with a command that includes a SOW query, using the replace option can re-issue the SOW query (as described in the AMPS User Guide).

To update a the filter on a subscription, you create a subscribe command. You set the SubscriptionId provided on the Command to the identifier of the existing subscription, and include the replace option on the Command. When you send the Command, AMPS atomically replaces the filter and sends messages that match the updated filter from that point forward.

client
    .connect('wss://localhost:9000/amps/json')
    .then(function() {

        // Original subscription
        return client.execute(
            new amps.Command('subscribe')
                .topic('orders')
                .filter('/id > 100'),
            onMessagePrinter
        );

    })
    .then(function(subId) {

        // replace the subscription with a new filter value
        return client.execute(
            new amps.Command('subscribe')
                .topic('orders')
                .filter('/id < 5')
                .subId(subId)
                .options('replace'),
            onMessagePrinter
        );

    });

Example 4.6: Update the filter on a subscription

Next steps

At this point, you are able to build AMPS programs in JavaScript that publish and subscribe to AMPS topics. For an AMPS application to be truly robust, it needs to be able to handle the errors and disconnections that occur in any distributed system. In the next chapter, we will take a closer look at error handling and recovery, and how you can use it to make your application ready for the real world.