ObjectBox Data Observers and Reactive Extensions

ObjectBox makes it easy for your app to react to data changes by providing:

  • data observers
  • reactive extensions
  • optional library to work with RxJava

This toolbox makes data flow simple and takes care of threading details.




Reactive Observers: A first Example

Let’s start with an example to demonstrate what you can do with reactive data observers:

The first line creates a regular Query to get certain Task objects. The second line connects an observer to the query. This is what happens:

  • the query is executed in the background
  • once the query finishes the observer gets the query result data
  • whenever changes are made to Task objects in the future, the query will be executed again
  • once updated query results are in, they are propagated to the observer
  • the observer is called on Android’s main thread

So it’s just two lines of code – but a lot of stuff happening behind the scenes. Now, let’s dive into the details, shall we?

Data Observers Basics

When objects changes, ObjectBox notifies previously subscribed data observers. You can either subscribe to changes of certain Object types (via BoxStore), or to Query results. Either way, you need to implement the generic io.objectbox.reactive.DataObserver interface:

This observer will be called by ObjectBox when necessary: typically shortly after subscribing and when data changes. Note that the onData() method is called asynchronously and decoupled from the thread causing the data change (e.g. the thread that committed a transaction).

Observing General Changes

The BoxStore allows to subscribe a DataObserver to Object types. Let’s say want to observe Task objects for a todo list app:

Then, let’s say some unrelated code of your app stores more users: ObjectBox will call myTaskObserver.onData(Task.class).

There is also a subscribe() variant taking no parameter: this will subscribe the observer to all available Object classes.

Observing Queries

ObjectBox let’s you build queries to find the objects matching certain criteria. Queries are an essential part of ObjectBox: whenever you need a specific set of data, you will probably use a Query.

The combination of Queries and Observers results in convenient and powerful tool. Query observers will deliver fresh query results whenever relevant data has been changed. Let’s say you display a list of todo tasks in your app. You can use a DataObserver to get the data and pass it to a method updateUi (lambda syntax):

So when is our observer lambda called? Immediately when an observer is subscribed, the query will be run (in a separate thread). Once the query result is available, it will be passed to the observer. To this is the first call to the observer. Now let’s say a tasks gets changed and stored in ObjectBox. It does not matter where and how; it might be the user who marked a task as completed, or some backend thread putting additional tasks during synchronization with the cloud. Either way, the Query will notify all observers with updated query results.

Note, that this pattern can greatly simplify your code: there is a single place where your data comes in to update your UI. There no more separate initialization code, nor wiring of events, rerunning queries, etc.

Observers and Transactions

Observer notifications occur after a transaction is committed. For some scenarios it is especially important to know transactions bounds. If you call Box.put() or remove() individually, an implicit transaction is started and committed. For example, this code fragment would trigger DataObservers on User.class twice:

There are several ways to combine several operations into one transaction, for example using one of the runInTx or callInTx methods in the BoxStore class. For our simple example, we can simply use an overload of put() accepting multiple objects:

This results in a single transaction and thus in a single DataObserver notification.

Subscriptions and their Cancellation

When you call observer(), it returns a subscription object implementing the io.objectbox.reactive.DataSubscription interface:

So, if you plan to unsubscribe your DataObserver later, it is a good idea to hold on to the DataSubscription. Call cancel() on it to let ObjectBox know that the Observer should not be notified anymore:

On Android, you would typically create the subscription in one of the onCreate/onStart/onResume methods and cancel it on their counterparts onDestroy/onStop/onPause.

Reactive Extensions

In the first part you saw how DataObservers can help you keeping your app state up to date. But there is more: ObjectBox comes with simple and convenient reactive extensions for typical tasks. While most of those is inspired by RxJava, it is not based on RxJava. ObjectBox brings its own features because not all developers are familiar with RxJava (for the RxJava ObjectBox library see below). We do not want to impose the complexity (Rx is almost like a new language to learn) and size of RxJava (~10k methods) on everyone. So, let’s keep it simple and neat for now.

Thread Scheduling

On Android, UI updates must occur on the main thread only. Luckily, ObjectBox allows to switch the observer from a background thread over to the main thread. Let’s look on the revised todo task example from above:

What is different to the previous task query example? The additional on() call is all that is needed to tell where we want our observer to be called. AndroidScheduler.mainThread() is a built-in Scheduler implementation. Alternatively, you can create an AndroidScheduler using a custom Looper, or build an fully custom scheduler by implementing the io.objectbox.reactive.Scheduler interface.

Transforming Data

Maybe you want to transform the data before you hand it over to an Observer. Let’s say, you want to keep track of count of all stored Objects for each type. The BoxStore subscription gives you the class of Objects, and this example show you how to transform this info into actual object counts:

Note that the transform operation takes a Class object and returns a Long number. Thus the DataObserver receives the Object count as a Long parameter in onData.

While the lambda syntax is nice and brief, let’s look at the io.objectbox.reactive.Transformer interface for clarification of what the transform() method expects as a parameter:

Some additional notes on Transformers:

  • Transforms are not required to actually “transform” any data. Technically, it’s fine to return the same data it received and just do some processing with (or without) it.
  • Transformer are always executed asynchronously. It is fine to perform long lasting operations.


Maybe you noticed that a Transformer may throw any type of exception. Also, a DataObserver might throw a RuntimeException. In both cases, you can provide an ErrorObserver to be notified about an occurred exception. The io.objectbox.reactive.ErrorObserver is straight-forward:

To specify your ErrorObserver, simply call the onError() method after subscribe().

Single Notifications vs. Only-Changes

When you subscribe to an Query, the DataObserver gets both by default:

  • Initial query results (right after subscribing)
  • Updated query results (underlying data was changed)

Sometimes you may by interested in only one of those. This is what the methods single() and onlyChanges() are for (call them after subscribe()). Single subscriptions are special in the way that they are cancelled automatically once the observer is notified. You can still cancel them manually to ensure no call to the observer is made at a certain point.

Weak References

Sometimes it may be nice to have DataObservers weakly referenced. Note that for the sake of a deterministic flow, it is advisable to explicitly cancel subscriptions explicitly whenever possible. If that does not scare you off, use weak() after subscribe().

Threading overview

To summarize threading as discussed earlier:

  • Query execution runs on a background thread (exclusive for this task)
  • DataTransformer runs on a background thread (exclusive for this task)
  • DataObserver and ErrorObserver run on a background thread unless a Scheduler is specified via the on() method.

ObjectBox RxJava Extension Library

By design, there are zero dependencies to any Rx libraries in the core of ObjectBox. As you saw before ObjectBox gives you simple means to transform data, asynchronous processing, thread scheduling, and one time (single) notifications. However, you still might want to integrate with mighty RxJava 2 (we have no plans to support RxJava 1). This is the purpose of the ObjectBox RxJava extension library. It requires an additional Gradle dependency (note that the version might become outdated on this page, so please check for newer versions):

With that you can use the classes RxQuery and RxBoxStore. Both offer static methods to subscribe using RxJava means.

For general Object changes, you can use RxBoxStore to create an Observable. RxQuery allows to subscribe to Query objects using:

  • Flowable
  • Observable
  • Single

Example usage:

The extension library is managed as a separate open source project on GitHub.



Spread the love
Sign up for fresh ObjectBox news here. No spam, just fresh developer news once in a while.