Fetching the current value of a topic
A client can send a fetch request for the values and/or topic specifications of a set of topics. The result set can be filtered by topic selector and topic path range.
Required permissions:
and permissions for the specified topicsA fetch request enables you to retrieve values and/or topic specifications for a set of topics without subscribing to the topics.
In Diffusion™ 6.2, there is a new enhanced fetch API, and the old fetch API is deprecated. This documentation describes the new API.
Results can be filtered by topic selector and topic path range. A request can specify the maximum number of results to return as well as a maximum topic path depth, avoiding the inefficient transfer of very large result sets.
A request can specify the value type of topics to be returned, in which case only topics of types compatible with the given value type will be returned. Returned values will be typed accordingly, avoiding the need for data conversion.
Here is how to fetch the value of a topic or set of topics by using a topic selector to make a fetch request:
const result = await session.fetchRequest() .withValues(diffusion.datatypes.string()) .fetch('*.*');
var session = Diffusion.Sessions .Principal( "client" ) .Password( "password" ) .Open( serverUrl ); var topics = session.Topics; var result = await topics.FetchRequest .WithValues<string>() .FetchAsync( "*.*" );
session = Diffusion.sessions().principal("client").password("password") .open(serverUrl); topics = session.feature(Topics.class); FetchResult<String> result = topics.fetchRequest() .withValues(String.class) .fetch("*.*").get(5, SECONDS);
DIFFUSION_FETCH_REQUEST_T *request = diffusion_fetch_request_init(session); DIFFUSION_DATATYPE datatype = DATATYPE_STRING; diffusion_fetch_request_with_values(request, &datatype, NULL); DIFFUSION_FETCH_REQUEST_PARAMS_T params = { .topic_selector = "*.*", .fetch_request = request, .on_fetch_result = on_fetch_result }; diffusion_fetch_request_fetch(session, params); diffusion_fetch_request_free(request);
session.topics .fetchRequest() .fetchStringValues(withTopicSelectorExpression: "*.*") { (fetch_result, error) in if (error != nil) { print("An error occurred while fetching: %@", error!.localizedDescription) return } }
To get the result set and print the results:
result.results().forEach((topicResult) => { console.log('Path: ' + topicResult.path()); console.log('Type: ' + topicResult.type()); console.log('Value: ' + topicResult.value()); });
foreach ( var item in result.Results ) { Console.WriteLine( $"{item.Type} : {item.Path}" ); }
List<TopicResult<Void>> results = result.results(); results.forEach(t -> { System.out.println(t.type() + " : " + t.path()); });
LIST_T *results = diffusion_fetch_result_get_topic_results(fetch_result); for (int i = 0; i < list_get_size(results); i++) { DIFFUSION_TOPIC_RESULT_T *topic_result = list_get_data_indexed(results, i); char *topic_path = diffusion_topic_result_get_path(topic_result); DIFFUSION_VALUE_T *value = diffusion_topic_result_get_value(topic_result); char *value_string; read_diffusion_string_value(value, &value_string, NULL); printf("%s: %s\n", topic_path, value_string); free(value_string); } list_free(results, (void (*)(void *))diffusion_topic_result_free);
for entry in fetch_result!.results { print("%@: %@", entry.specification.type, entry.path) }
Fetching topic specifications
You can return topic specifications instead of values for each topic selected.
const result = await session.fetchRequest() .withProperties() .fetch('*.*'); const topicResult = result.results()[0]; const properties = topicResult.specification().properties;
var result = await topics.FetchRequest .WithProperties() .FetchAsync( "*Accounts/" ); var topicResult = result.Results.First(); var properties = topicResult.Specification.Properties;
FetchResult<Void> result = topics.fetchRequest() .withProperties() .fetch("*Accounts/").get(5, SECONDS); TopicResult<Void> topicResult = result.get(0); Map<String, String> properties = topicResult.specification().getProperties();
static int on_fetch_result_with_properties( const DIFFUSION_FETCH_RESULT_T *fetch_result, void *context) { LIST_T *results = diffusion_fetch_result_get_topic_results(fetch_result); for (int i = 0; i < list_get_size(results); i++) { DIFFUSION_TOPIC_RESULT_T *topic_result = list_get_data_indexed(results, i); TOPIC_SPECIFICATION_T *specification = diffusion_topic_result_get_specification(topic_result); HASH_T *properties = topic_specification_get_properties(specification); char **keys = hash_keys(properties); char *key = *keys; while (key != NULL) { char *value = hash_get(properties, key); printf("%s: %s\n", key, value); key = *keys++; } free(keys); hash_free(properties, free, free); } list_free(results, (void (*)(void *))diffusion_topic_result_free); return HANDLER_SUCCESS; } void fetch_with_properties(SESSION_T *session) { DIFFUSION_FETCH_REQUEST_T *request = diffusion_fetch_request_init(session); diffusion_fetch_request_with_properties(request, NULL); DIFFUSION_FETCH_REQUEST_PARAMS_T params = { .topic_selector = "*.*", .fetch_request = request, .on_fetch_result = on_fetch_result_with_properties }; diffusion_fetch_request_fetch(session, params); diffusion_fetch_request_free(request); }
session.topics .fetchRequest() .withProperties() .fetch(withTopicSelectorExpression: "Accounts/") { (fetch_result, error) in if (error != nil) { print("An error occurred while fetching: %@", error!.localizedDescription) return } let topic_result = fetch_result!.results.first print("Properties: %@", topic_result!.specification.properties) }
Filtering by topic type
The results can also be restricted to topics of a particular topic type or types:const result = await session.fetchRequest() .topicTypes([diffusion.topics.TopicType.STRING, diffusion.topics.TopicType.INT64]) .fetch('*Accounts/');
var result = await topics.FetchRequest .TopicTypes( new[] { TopicType.STRING, TopicType.INT64 } ) .FetchAsync( "*Accounts/" );
FetchResult<Void> result = topics.fetchRequest() .topicTypes(EnumSet.of(TopicType.STRING, TopicType.INT64)) .fetch("*Accounts/").get(5, SECONDS);
DIFFUSION_FETCH_REQUEST_T *request = diffusion_fetch_request_init(session); TOPIC_TYPE_T topic_type_string = TOPIC_TYPE_STRING; TOPIC_TYPE_T topic_type_int64 = TOPIC_TYPE_INT64; TOPIC_TYPE_T topic_type_json = TOPIC_TYPE_JSON; SET_T *topic_types = set_new_int(3); set_add(topic_types, &topic_type_string); set_add(topic_types, &topic_type_int64); set_add(topic_types, &topic_type_json); diffusion_fetch_request_topic_types(request, topic_types, NULL); DIFFUSION_FETCH_REQUEST_PARAMS_T params = { .topic_selector = "*.*", .fetch_request = request, .on_fetch_result = on_fetch_result }; diffusion_fetch_request_fetch(session, params); diffusion_fetch_request_free(request);
var topic_types = Set<NSNumber>() topic_types.insert(NSNumber(nonretainedObject: PTDiffusionTopicType.string)) topic_types.insert(NSNumber(nonretainedObject: PTDiffusionTopicType.int64)) session.topics .fetchRequest() .topicTypes(topic_types) .fetch(withTopicSelectorExpression: "Accounts/") { _, _ in }
Restricting the results to a range of topics
You can restrict the returned results to within a specified range of topics. All the topics within the selection that have a path that is lexically within the specified range will be returned, at all levels.
You can specify either a start point or an end point or both. For example, if you specify a start point but no end point, results will be returned from the start point up to the end of the topic tree.
The specified start and end points do not need to represent topics that actually exist.
const result = await session.fetchRequest() .withValues(diffusion.datatypes.binary()) .from('Accounts/Dept05') .to('Accounts/Dept10') .fetch('*Accounts/');
var result = await topics.FetchRequest .WithValues<IBytes>() .From( "Accounts/Dept05" ) .To( "Accounts/Dept10" ) .FetchAsync( "*Accounts/" );
FetchResult<Bytes> result = topics.fetchRequest() .withValues(Bytes.class) .from("Accounts/Dept05") .to("Accounts/Dept10") .fetch("*Accounts/").get(5, SECONDS);
DIFFUSION_FETCH_REQUEST_T *request = diffusion_fetch_request_init(session); DIFFUSION_DATATYPE datatype = DATATYPE_BINARY; diffusion_fetch_request_with_values(request, &datatype, NULL); diffusion_fetch_request_from(request, "Accounts/Dept05", NULL); diffusion_fetch_request_to(request, "Accounts/Dept10", NULL); DIFFUSION_FETCH_REQUEST_PARAMS_T params = { .topic_selector = "*Accounts/", .fetch_request = request, .on_fetch_result = on_fetch_account_results }; diffusion_fetch_request_fetch(session, params); diffusion_fetch_request_free(request);
session.topics .fetchRequest() .fromTopicPath("Accounts/Dept05") .toTopicPath("Accounts/Dept10") .fetchBinaryValues(withTopicSelectorExpression: "Accounts/") { _, _ in }
Paging through topics
You can specify a non-inclusive range using the after and before methods. You can limit the number of results and check if there are further results remaining.
By combining these, you can page through a topic tree. This can be useful when there is a large number of topics and you wish to access it in manageable chunks, for example when presenting results from a large set into a limited window in a user interface.
Here is an example of paging through all string topics in a topic tree, in chunks of 20:
const request = session.fetchRequest() .withValues(diffusion.datatypes.binary()) .first(20); let result = await request.fetch('*.*'); if (result.hasMore()) { result = await request.after(result.results()[19].path()).fetch('*.*'); }
var request = topics.FetchRequest .WithValues<string>() .First( 20 ); var result = await request.FetchAsync( "*.*" ); if ( result.HasMore ) { result = await request .After( result.Results.Last().Path ) .FetchAsync( "*.*" ); }
FetchRequest request = topics.fetchRequest() .withValues(String.class) .first(20); FetchResult<String> result = request.fetch("*.*").get(5, SECONDS); if (result.hasMore()) { result = request.after(result.results.get(19).path()).fetch("*.*"); }
// global variables SESSION_T *g_session; DIFFUSION_FETCH_REQUEST_T *g_fetch_request; static int on_fetch_result_with_paging( const DIFFUSION_FETCH_RESULT_T *fetch_result, void *context) { if (!diffusion_fetch_result_has_more(fetch_result)) { // reached the end of the results return HANDLER_SUCCESS; } LIST_T *results = diffusion_fetch_result_get_topic_results(fetch_result); DIFFUSION_TOPIC_RESULT_T *topic_result = list_get_data_indexed(results, 19); diffusion_fetch_request_after( g_fetch_request, diffusion_topic_result_get_path(topic_result), NULL); DIFFUSION_FETCH_REQUEST_PARAMS_T params = { .topic_selector = "*.*", .fetch_request = g_fetch_request, .on_fetch_result = on_fetch_result_with_paging }; diffusion_fetch_request_fetch(g_session, params); list_free(results, (void (*)(void *))diffusion_topic_result_free); return HANDLER_SUCCESS; } void fetch_with_paging() { g_fetch_request = diffusion_fetch_request_init(g_session); DIFFUSION_DATATYPE datatype = DATATYPE_STRING; diffusion_fetch_request_with_values(g_fetch_request, &datatype, NULL); diffusion_fetch_request_first(g_fetch_request, 20, NULL); DIFFUSION_FETCH_REQUEST_PARAMS_T params = { .topic_selector = "*.*", .fetch_request = g_fetch_request, .on_fetch_result = on_fetch_result_with_paging }; diffusion_fetch_request_fetch(g_session, params); diffusion_fetch_request_free(g_fetch_request); }
session.topics .fetchRequest() .first(20) .fetchStringValues(withTopicSelectorExpression: "*.*") { (fetch_result, error) in if (fetch_result!.hasMore) { session.topics .fetchRequest() .afterTopicPath(fetch_result!.results.last!.path) .fetchStringValues(withTopicSelectorExpression: "*.*") { _, _ in } } }
Fetching unpublished delayed topics
Delayed topic views create reference topics in an unpublished state. The topics are published once the delay time has expired.
A client session can use a fetch request with the withUnpublishedDelayedTopics method to retrieve any unpublished topics.
A reference topic in the unpublished state which matches the query will only be included in the fetch results if the session has read_topic permission for the reference's source topic as well as read_topic permission for the reference topic.
DIFFUSION_FETCH_REQUEST_T *request = diffusion_fetch_request_init(session); DIFFUSION_DATATYPE datatype = DATATYPE_STRING; diffusion_fetch_request_with_values(request, &datatype, NULL); diffusion_fetch_request_with_unpublished_delayed_topics(request, NULL); DIFFUSION_FETCH_REQUEST_PARAMS_T params = { .topic_selector = "*.*", .fetch_request = request, .on_fetch_result = on_fetch_result_with_paging }; diffusion_fetch_request_fetch(session, params); diffusion_fetch_request_free(request);
session.topics .fetchRequest() .withUnpublishedDelayedTopics() .fetchStringValues(withTopicSelectorExpression: "*.*") { _, _ in }