1. What are Change Streams?
When you work with RDBMS like MySQL or SQL Server, you’ve probably heard of Trigger . And when you hunt MongoDB, have you ever wondered, Trigger in MongoDB like?
MongoDB has no mechanism for you to pre-install Trigger on DB like MySQL. However, if we can stream the changes in the Database, we can do the same features as the Trigger from the app-server side and even more!
In MongoDB, starting with version 3.6, there’s a feature you can do that called Change Streams . Change streams work based on listening to the oplog , which you can understand simply as MongoDB’s log for replication . They record all data modifications in your Database.
Therefore, you can use change streams to subscribe to changes on a collection, or a database and even a deployment.
2. Stream A Collection / Database / Deployment
To change change streams, your database must provide changeStream
and find
actions, specifically as follows:
1 2 | { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] } |
a. Create pointers for stream Collection / Database / Deployment
- You can do it for any collection (except for system collections, or collections located in admin , local and config databases). Execute the command
db.collection.watch()
to get started. - For Database, you can do it with any database except admin , local and config databases. Statement:
db.watch()
- For Deployment, you can track deployment-related changes to all databases, collections except admin , local and config databases. Statement:
Mongo.watch()
b. Create change stream
After creating a cursor for the stream, you can open a change stream to stream data in mongoDB:
1 2 3 4 5 6 | const collection = db.collection('test'); const changeStream = collection.watch(); // có thể là Mongo.watch() hoặc db.watch() changeStream.on('change', changeEvent => { // process next document }); |
You can also write iterator like this:
1 2 3 4 | const collection = db.collection('test'); const changeStreamIterator = collection.watch(); const next = await changeStreamIterator.next(); |
c. changeEvent object
Looking at the code above, you will be curious what the above changeEvent
object is? It is an object of the form:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | { _id : { <BSON Object> }, "operationType" : "<operation>", "fullDocument" : { <document> }, "ns" : { "db" : "<database>", "coll" : "<collection" }, "to" : { "db" : "<database>", "coll" : "<collection" }, "documentKey" : { "_id" : <value> }, "updateDescription" : { "updatedFields" : { <document> }, "removedFields" : [ "<field>", ... ] } "clusterTime" : <Timestamp>, "txnNumber" : <NumberLong>, "lsid" : { "id" : <UUID>, "uid" : <BinData> } } |
Inside:
operationType
is the type of the event that occurred:insert
,update
,replace
,delete
,drop
,rename
,dropDatabase
,invalidate
.fullDocument
is the information of the newly implementeddocument
CRUD (insert
,update
,replace
,delete
), for thisdelete
field is ignored because the document no longer exists. Forupdate
, this field exists if you setfullDocument: 'updateLookup'
for the stream:
1 2 | const changeStreamIterator = collection.watch({ fullDocument: 'updateLookup' }); |
ns
: database and collection name are affected by the event.to
: when you change thedatabase
name andcollection
, it shows the new name ofns
.documentKey
: contains the_id
of thedocument
be made changes.updateDescription
: contains informationfields
updated or deleted by theupdate
operation.clusterTime
: Event execution time on theoplog
.
You can refer to more details about it here
3. Resume a Change Stream
For example, a series of consecutive change streams occurs, for some reason, you stop at a change stream, assume that the connection to the database server is down, or you need to disconnect from the database server -> you cannot handle it. events happen right after.
MongoDB has a mechanism for you to solve this problem, by storing the token
of a change stream
and then you can resume
listening to the events starting from that change stream
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | const collection = db.collection('test'); const changeStream = collection.watch(); let newChangeStream; changeStream.once('change', next => { const resumeToken = changeStream.resumeToken; changeStream.close(); newChangeStream = collection.watch({ resumeAfter: resumeToken }); newChangeStream.on('change', changeEvent => { // process next document }); }); |
As you can see, we close the change stream
immediately after retrieving the resumeToken
and you can resume from the change stream
with the option: { resumeAfter: resumeToken }
Note:
- resumeAfter means continuing to stream from
resumeToken
so if that stream is aninvalidate
event, it will close change streams. - Since version
4.2
you can usestartAfter
instead ofresumeAfter
, it will create a newresumeToken
after theresumeToken
rather than continue the old streams. So it will not be closed if it is aninvalidate
event.
4. Conclusion
Change streams is a pretty useful feature and not difficult to handle. We can use it to listen for events in the database realtime. Hopefully the article will help you when working with MongoDB.