Receiving messages

Every request-response message has an associated message path. A response to a message uses the same path as the original message.

A message path looks like a topic path, made up of path segments separated by slashes:

status/rockets/fuel-messages

As a convention, you can send request-response messages related to a topic path on the matching message path (for example, you could send messages about the /foo/bar topic using the /foo/bar message path).

Topic paths and message paths are completely separate.

Messages do not affect topic state. Subscribers to a topic will not receive messages to the matching message path. Security permissions for message paths and topic paths are unrelated

Registering a handler

A client has to register a request handler on a message path to receive messages. Each client can have only one handler for each message path.

Java and Android
session.feature(Messaging.class)
.setRequestStream("my/topic/path", JSON.class, JSON.class,
    new Messaging.RequestStream<JSON, JSON>() {
        @Override
        public void onClose() {
            LOG.info("on close");
        }
        @Override
        public void onError(ErrorReason errorReason) {
            LOG.info("on error: " + errorReason);
        }
        @Override
        public void onRequest(String path, JSON request, Responder<JSON> responder) {
            LOG.info("Received request:: " + request.toJsonString());
            responder.respond(Diffusion.dataTypes().json()
                .fromJsonString("{\"greetings\": \"stranger\"}"));
        }
    });
JavaScript
const request_stream = {
    onRequest: (path, request, responder) => {
        console.log(`Received request for path "${path}": ${JSON.stringify(request.get())}`);

        const jsonDataType = diffusion.datatypes.json();
        const jsonValue = jsonDataType.from({
            hello : 'world'
        });
        responder.respond(jsonValue, jsonDataType);
    },

    onError: (error) => {
        console.log(`An error has occurred: ${error}`);
    },

    onClose: () => {
        console.log('Request stream is closed');
    },
}

...

session.messages.setRequestStream(message_path, request_stream, diffusion.datatypes.json());
.NET
var requestStream = new JSONRequestStreamHandler();
session.Messaging.SetRequestStream(messagePath, requestStream);
WriteLine($"Request stream has been successfully set.");
C
static int on_request_stream(
        SESSION_T *session,
        const char *request_path,
        DIFFUSION_DATATYPE request_datatype,
        const DIFFUSION_VALUE_T *request,
        const DIFFUSION_RESPONDER_HANDLE_T *handle,
        void *context)
{
        DIFFUSION_API_ERROR api_error;
        char *request_json_string;
        bool success = to_diffusion_json_string(request, &request_json_string, &api_error);
        if (success) {
                printf("Received request: %s\n\n", request_json_string);
                free(request_json_string);
        }

        BUF_T *response_buf = buf_create();
        write_diffusion_json_value("{\"greetings\": \"stranger\"}", response_buf);
        diffusion_respond_to_request(session, handle, response_buf, NULL);
        buf_free(response_buf);

        return HANDLER_SUCCESS;
}


...


DIFFUSION_REQUEST_STREAM_T request_stream = {
        .on_request = on_request_stream
};

set_request_stream(session, message_path, DATATYPE_JSON, DATATYPE_JSON, &request_stream);
Python
def callback(request: str, **kwargs) -> str:
    print(f"Received request {request}")
    return diffusion.datatype.JSON({"greetings": "stranger"})

...

handler = diffusion.messaging.RequestHandler(
    callback,
    request_type=diffusion.datatypes.JSON,
    response_type=diffusion.datatypes.JSON
)
session.messaging.add_stream_handler(message_path, handler=handler, addressed=True)
print("Request stream has been successfully set.")
Apple
class JSONRequestStreamDelegate: PTDiffusionJSONRequestStreamDelegate {
    func diffusionStream(_ stream: PTDiffusionStream,
                         didReceiveRequestWith json: PTDiffusionJSON,
                         responder: PTDiffusionResponder) {
        print("Received request %@", json)
        let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
        responder.respond(with: json_response)
    }

    func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) {
        print("Request stream failed with error %@", error.localizedDescription)
    }

    func diffusionDidClose(_ stream: PTDiffusionStream) {
        print("Request stream is now closed")
    }
}

...

let stream_delegate = JSONRequestStreamDelegate()
let request_stream = PTDiffusionJSON.requestStream(with: stream_delegate)
session.messaging.setRequestStream(request_stream, forPath: message_path)
print("Message request stream has been set.")
Java and Android
session.feature(Messaging.class)
.addRequestHandler("my/topic/path", JSON.class, JSON.class,
    new Messaging.RequestHandler<JSON, JSON>() {
        @Override
        public void onClose() {
            LOG.info("on close");
        }
        @Override
        public void onError(ErrorReason errorReason) {
            LOG.info("on error: " + errorReason);
        }
        @Override
        public void onRequest(JSON request, RequestContext context,
            Responder<JSON> responder) {
            LOG.info("Received request:: " + request.toJsonString());
            responder.respond(Diffusion.dataTypes().json()
                .fromJsonString("{\"greetings\": \"stranger\"}"));
        }
});
JavaScript
const request_handler = {
    onRequest: (request, context, responder) => {
        console.log(`Received request on path "${context.path}" from session ${context.sessionId}: ${JSON.stringify(request.get())}`);

        const jsonDataType = diffusion.datatypes.json();
        const jsonValue = jsonDataType.from({
            hello : 'world'
        });
        responder.respond(jsonValue, jsonDataType);
    },
    onError: (error) => {
        console.log(`Message path failed with error: ${error}`);
    },
    onClose: () => {
        console.log('Message path is now closed');
    }
};

...

await session.messages.addRequestHandler(message_path, request_stream);
.NET
private class JSONRequestHandler : IRequestHandler<IJSON, IJSON>
{
    public void OnRequest(IJSON json, IRequestContext context, IResponder<IJSON> responder)
    {
        WriteLine($"Received request: {json.ToJSONString()}.");
        var jsonResponse = Diffusion.DataTypes.JSON.FromJSONString("{\"greetings\": \"stranger\"}");
        responder.Respond(jsonResponse);
    }

    public void OnClose()
        => WriteLine("Message path is now closed.");

    public void OnError(ErrorReason errorReason)
        => WriteLine($"Message path failed with error: {errorReason.Description}.");
}

...

var handler = new JSONRequestHandler();
await session.Messaging.AddRequestHandlerAsync(messagePath, handler);
WriteLine($"Message path {messagePath} has been successfully registered.");
C
static int on_active(
        SESSION_T *session,
        const char *path,
        const DIFFUSION_REGISTRATION_T *registered_handler)
{
        // request handler is now active
        return HANDLER_SUCCESS;
}

static int on_request(
        SESSION_T *session,
        DIFFUSION_DATATYPE request_datatype,
        const DIFFUSION_VALUE_T *request,
        const DIFFUSION_REQUEST_CONTEXT_T *request_context,
        const DIFFUSION_RESPONDER_HANDLE_T *handle,
        void *context)
{
        char *request_val;
        read_diffusion_string_value(request, &request_val, NULL);

        SESSION_ID_T *session_id = diffusion_request_context_get_session_id(request_context);
        char *session_id_str = session_id_to_string(session_id);

        printf("Request received from %s: %s\n", session_id_str, request_val);
        free(request_val);

        BUF_T *response_buf = buf_create();
        write_diffusion_string_value("{\"greetings\": \"stranger\"}", response_buf);
        diffusion_respond_to_request(session, handle, response_buf, NULL);

        buf_free(response_buf);
        session_id_free(session_id);
        free(session_id_str);

        return HANDLER_SUCCESS;
}


...


DIFFUSION_REQUEST_HANDLER_T request_handler = {
        .request_datatype = DATATYPE_JSON,
        .response_datatype = DATATYPE_JSON,
        .on_active = on_active,
        .on_request = on_request
};

ADD_REQUEST_HANDLER_PARAMS_T parameters = {
        .path = message_path,
        .request_handler = &request_handler
};

add_request_handler(session, parameters);
Python
def callback(request: str, **kwargs) -> str:
    print(f"Received request {request}")
    return diffusion.datatype.JSON({"greetings": "stranger"})

...

handler = diffusion.messaging.RequestHandler(
    callback,
    request_type=diffusion.datatypes.JSON,
    response_type=diffusion.datatypes.JSON
)
await session.messaging.add_request_handler(message_path, handler=handler)
Apple
class JSONRequestHandler: PTDiffusionJSONRequestDelegate {
    func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                        didReceiveRequestWith json: PTDiffusionJSON,
                                        context: PTDiffusionRequestContext,
                                        responder: PTDiffusionResponder) {
        print("Received request: %@", json)
        let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
        responder.respond(with: json_response)
    }

    func diffusionTopicTreeRegistrationDidClose(_ registration: PTDiffusionTopicTreeRegistration) {
        print("Message path is now closed")
    }

    func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                        didFailWithError error: Error) {
        print("Message path failed with error: %@", error.localizedDescription)
    }
}

...

let handler = JSONRequestHandler()
let request_stream = PTDiffusionJSON.requestHandler(with: handler)
session.messaging.add(request_stream, forPath: message_path) { (registration, error) in
    print("Message path %@ has been successfully registered.", message_path)
}