Receiving messages
Every request-response message has an associated message path. A response to a message uses the same path as the original message.
A message path looks like a topic path, made up of path segments separated by slashes:
status/rockets/fuel-messages
As a convention, you can send request-response messages related to a topic path on the matching message path (for example, you could send messages about the /foo/bar
topic using the /foo/bar
message path).
Topic paths and message paths are completely separate. |
Messages do not affect topic state. Subscribers to a topic will not receive messages to the matching message path. Security permissions for message paths and topic paths are unrelated
Registering a handler
A client has to register a request handler on a message path to receive messages. Each client can have only one handler for each message path.
Java and Android
session.feature(Messaging.class)
.setRequestStream("my/topic/path", JSON.class, JSON.class,
new Messaging.RequestStream<JSON, JSON>() {
@Override
public void onClose() {
LOG.info("on close");
}
@Override
public void onError(ErrorReason errorReason) {
LOG.info("on error: " + errorReason);
}
@Override
public void onRequest(String path, JSON request, Responder<JSON> responder) {
LOG.info("Received request:: " + request.toJsonString());
responder.respond(Diffusion.dataTypes().json()
.fromJsonString("{\"greetings\": \"stranger\"}"));
}
});
JavaScript
const request_stream = {
onRequest: (path, request, responder) => {
console.log(`Received request for path "${path}": ${JSON.stringify(request.get())}`);
const jsonDataType = diffusion.datatypes.json();
const jsonValue = jsonDataType.from({
hello : 'world'
});
responder.respond(jsonValue, jsonDataType);
},
onError: (error) => {
console.log(`An error has occurred: ${error}`);
},
onClose: () => {
console.log('Request stream is closed');
},
}
...
session.messages.setRequestStream(message_path, request_stream, diffusion.datatypes.json());
.NET
var requestStream = new JSONRequestStreamHandler();
session.Messaging.SetRequestStream(messagePath, requestStream);
WriteLine($"Request stream has been successfully set.");
C
static int on_request_stream(
SESSION_T *session,
const char *request_path,
DIFFUSION_DATATYPE request_datatype,
const DIFFUSION_VALUE_T *request,
const DIFFUSION_RESPONDER_HANDLE_T *handle,
void *context)
{
DIFFUSION_API_ERROR api_error;
char *request_json_string;
bool success = to_diffusion_json_string(request, &request_json_string, &api_error);
if (success) {
printf("Received request: %s\n\n", request_json_string);
free(request_json_string);
}
BUF_T *response_buf = buf_create();
write_diffusion_json_value("{\"greetings\": \"stranger\"}", response_buf);
diffusion_respond_to_request(session, handle, response_buf, NULL);
buf_free(response_buf);
return HANDLER_SUCCESS;
}
...
DIFFUSION_REQUEST_STREAM_T request_stream = {
.on_request = on_request_stream
};
set_request_stream(session, message_path, DATATYPE_JSON, DATATYPE_JSON, &request_stream);
Python
def callback(request: str, **kwargs) -> str:
print(f"Received request {request}")
return diffusion.datatype.JSON({"greetings": "stranger"})
...
handler = diffusion.messaging.RequestHandler(
callback,
request_type=diffusion.datatypes.JSON,
response_type=diffusion.datatypes.JSON
)
session.messaging.add_stream_handler(message_path, handler=handler, addressed=True)
print("Request stream has been successfully set.")
Apple
class JSONRequestStreamDelegate: PTDiffusionJSONRequestStreamDelegate {
func diffusionStream(_ stream: PTDiffusionStream,
didReceiveRequestWith json: PTDiffusionJSON,
responder: PTDiffusionResponder) {
print("Received request %@", json)
let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
responder.respond(with: json_response)
}
func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) {
print("Request stream failed with error %@", error.localizedDescription)
}
func diffusionDidClose(_ stream: PTDiffusionStream) {
print("Request stream is now closed")
}
}
...
let stream_delegate = JSONRequestStreamDelegate()
let request_stream = PTDiffusionJSON.requestStream(with: stream_delegate)
session.messaging.setRequestStream(request_stream, forPath: message_path)
print("Message request stream has been set.")
Java and Android
session.feature(Messaging.class)
.addRequestHandler("my/topic/path", JSON.class, JSON.class,
new Messaging.RequestHandler<JSON, JSON>() {
@Override
public void onClose() {
LOG.info("on close");
}
@Override
public void onError(ErrorReason errorReason) {
LOG.info("on error: " + errorReason);
}
@Override
public void onRequest(JSON request, RequestContext context,
Responder<JSON> responder) {
LOG.info("Received request:: " + request.toJsonString());
responder.respond(Diffusion.dataTypes().json()
.fromJsonString("{\"greetings\": \"stranger\"}"));
}
});
JavaScript
const request_handler = {
onRequest: (request, context, responder) => {
console.log(`Received request on path "${context.path}" from session ${context.sessionId}: ${JSON.stringify(request.get())}`);
const jsonDataType = diffusion.datatypes.json();
const jsonValue = jsonDataType.from({
hello : 'world'
});
responder.respond(jsonValue, jsonDataType);
},
onError: (error) => {
console.log(`Message path failed with error: ${error}`);
},
onClose: () => {
console.log('Message path is now closed');
}
};
...
await session.messages.addRequestHandler(message_path, request_stream);
.NET
private class JSONRequestHandler : IRequestHandler<IJSON, IJSON>
{
public void OnRequest(IJSON json, IRequestContext context, IResponder<IJSON> responder)
{
WriteLine($"Received request: {json.ToJSONString()}.");
var jsonResponse = Diffusion.DataTypes.JSON.FromJSONString("{\"greetings\": \"stranger\"}");
responder.Respond(jsonResponse);
}
public void OnClose()
=> WriteLine("Message path is now closed.");
public void OnError(ErrorReason errorReason)
=> WriteLine($"Message path failed with error: {errorReason.Description}.");
}
...
var handler = new JSONRequestHandler();
await session.Messaging.AddRequestHandlerAsync(messagePath, handler);
WriteLine($"Message path {messagePath} has been successfully registered.");
C
static int on_active(
SESSION_T *session,
const char *path,
const DIFFUSION_REGISTRATION_T *registered_handler)
{
// request handler is now active
return HANDLER_SUCCESS;
}
static int on_request(
SESSION_T *session,
DIFFUSION_DATATYPE request_datatype,
const DIFFUSION_VALUE_T *request,
const DIFFUSION_REQUEST_CONTEXT_T *request_context,
const DIFFUSION_RESPONDER_HANDLE_T *handle,
void *context)
{
char *request_val;
read_diffusion_string_value(request, &request_val, NULL);
SESSION_ID_T *session_id = diffusion_request_context_get_session_id(request_context);
char *session_id_str = session_id_to_string(session_id);
printf("Request received from %s: %s\n", session_id_str, request_val);
free(request_val);
BUF_T *response_buf = buf_create();
write_diffusion_string_value("{\"greetings\": \"stranger\"}", response_buf);
diffusion_respond_to_request(session, handle, response_buf, NULL);
buf_free(response_buf);
session_id_free(session_id);
free(session_id_str);
return HANDLER_SUCCESS;
}
...
DIFFUSION_REQUEST_HANDLER_T request_handler = {
.request_datatype = DATATYPE_JSON,
.response_datatype = DATATYPE_JSON,
.on_active = on_active,
.on_request = on_request
};
ADD_REQUEST_HANDLER_PARAMS_T parameters = {
.path = message_path,
.request_handler = &request_handler
};
add_request_handler(session, parameters);
Python
def callback(request: str, **kwargs) -> str:
print(f"Received request {request}")
return diffusion.datatype.JSON({"greetings": "stranger"})
...
handler = diffusion.messaging.RequestHandler(
callback,
request_type=diffusion.datatypes.JSON,
response_type=diffusion.datatypes.JSON
)
await session.messaging.add_request_handler(message_path, handler=handler)
Apple
class JSONRequestHandler: PTDiffusionJSONRequestDelegate {
func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
didReceiveRequestWith json: PTDiffusionJSON,
context: PTDiffusionRequestContext,
responder: PTDiffusionResponder) {
print("Received request: %@", json)
let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
responder.respond(with: json_response)
}
func diffusionTopicTreeRegistrationDidClose(_ registration: PTDiffusionTopicTreeRegistration) {
print("Message path is now closed")
}
func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
didFailWithError error: Error) {
print("Message path failed with error: %@", error.localizedDescription)
}
}
...
let handler = JSONRequestHandler()
let request_stream = PTDiffusionJSON.requestHandler(with: handler)
session.messaging.add(request_stream, forPath: message_path) { (registration, error) in
print("Message path %@ has been successfully registered.", message_path)
}