Change streams in MongoDB

Tram Ho

1. What are Change Streams?

When you work with RDBMS like MySQL or SQL Server, you’ve probably heard of Trigger . And when you hunt MongoDB, have you ever wondered, Trigger in MongoDB like?

MongoDB has no mechanism for you to pre-install Trigger on DB like MySQL. However, if we can stream the changes in the Database, we can do the same features as the Trigger from the app-server side and even more!

In MongoDB, starting with version 3.6, there’s a feature you can do that called Change Streams . Change streams work based on listening to the oplog , which you can understand simply as MongoDB’s log for replication . They record all data modifications in your Database.

Therefore, you can use change streams to subscribe to changes on a collection, or a database and even a deployment.

2. Stream A Collection / Database / Deployment

To change change streams, your database must provide changeStream and find actions, specifically as follows:

a. Create pointers for stream Collection / Database / Deployment

  • You can do it for any collection (except for system collections, or collections located in admin , local and config databases). Execute the command db.collection.watch() to get started.
  • For Database, you can do it with any database except admin , local and config databases. Statement: db.watch()
  • For Deployment, you can track deployment-related changes to all databases, collections except admin , local and config databases. Statement: Mongo.watch()

b. Create change stream

After creating a cursor for the stream, you can open a change stream to stream data in mongoDB:

You can also write iterator like this:

c. changeEvent object

Looking at the code above, you will be curious what the above changeEvent object is? It is an object of the form:

Inside:

  • operationType is the type of the event that occurred: insert , update , replace , delete , drop , rename , dropDatabase , invalidate .
  • fullDocument is the information of the newly implemented document CRUD ( insert , update , replace , delete ), for this delete field is ignored because the document no longer exists. For update , this field exists if you set fullDocument: 'updateLookup' for the stream:

  • ns : database and collection name are affected by the event.
  • to : when you change the database name and collection , it shows the new name of ns .
  • documentKey : contains the _id of the document be made changes.
  • updateDescription : contains information fields updated or deleted by the update operation.
  • clusterTime : Event execution time on the oplog .

You can refer to more details about it here ?

3. Resume a Change Stream

For example, a series of consecutive change streams occurs, for some reason, you stop at a change stream, assume that the connection to the database server is down, or you need to disconnect from the database server -> you cannot handle it. events happen right after.

MongoDB has a mechanism for you to solve this problem, by storing the token of a change stream and then you can resume listening to the events starting from that change stream .

As you can see, we close the change stream immediately after retrieving the resumeToken and you can resume from the change stream with the option: { resumeAfter: resumeToken }

Note:

  • resumeAfter means continuing to stream from resumeToken so if that stream is an invalidate event, it will close change streams.
  • Since version 4.2 you can use startAfter instead of resumeAfter , it will create a new resumeToken after the resumeToken rather than continue the old streams. So it will not be closed if it is an invalidate event.

4. Conclusion

Change streams is a pretty useful feature and not difficult to handle. We can use it to listen for events in the database realtime. Hopefully the article will help you when working with MongoDB. ?

Share the news now

Source : Viblo