Sync data from MongoDb to ElasticSearch using Transporter
Recently, My team has been asked to implement a web crawler to index the content of a few websites and save those data into some database for further analysis. After a long discussion we decided to go with MongoDB and elasticsearch. Elasticsearch facilitates full text search of the data, while MongoDB excels at storing it. Using MongoDB to store our data and Elasticsearch for search seems to be a solid choice.
In this post, I'll show how to use the open-source utility Transporter to quickly copy data from MongoDB to Elasticsearch with custom transformations.
Build Transporter
Well, I'm not going to go deep in details on this particular part since the Transporter
documentation explains how to achieve this for both *nix and windows systems. You can also use the Vagrant VM provisioned by Ansible.
Elasticsearch and Mongo
I'll be using Docker containers for this small example to set up Elasticsearch and MongoDB nodes, as shown in the below Docker-compose.yml
file:
mongodb:
image: mongo
ports:
- "27017:27017"
mongo-seed:
build: ./mongo-seed
links:
- mongodb
elasticsearch:
image: elasticsearch
ports:
- 9200:9200
- 9300:9300
The role of mongodb
and elasticsearch
services is obvious (I guess), while mongo-seed
service aims to import data to our mongodb
container. The below Dockerfile use the mongoimport
command for seeding data, which it will create a mongo restaurants
collection in the test
database:
FROM mongo
COPY data.json /data.json
CMD mongoimport --host mongodb --db test --collection restaurants --drop --file /data.json
you can find the dataset in this link, you'll need to retrive it and save it in a data.json
file.
Running the Transformation
Now that we are done with the setup, it's time to sync and transform our data. Make sure both Elasticsearch and mongodb are running!
Now run the following command:
transporter init mongodb elasticsearch
This generates a basic pipeline.js
file in your current directory. A Transformer pipeline is a sequence of adaptors, which connect a source adaptor, any number of transformers and any number of sink adaptors to create a path for messages to flow down, all of this defined in the configuration section of pipeline.js
.
After that, modify your generated pipeline.js
to look like this:
// mongodb configuration
var source = mongodb({
"uri": "mongodb://127.0.0.1:27017/test"
})
// Elasticsearch configuration
var sink = elasticsearch({
"uri": "http://localhost:9200/test"
})
// Get data, transform it, Then save it
t.Source("source", source, "/.*/").Transform(omit({"fields":["grades"]})).Save("sink", sink, "/.*/")
source
and sink
are 2 javascript objects that represents our nodes, then we configured the required parameter, uri
which is used to locate the database for reading or writing. We also do a basic transformation by removing the grades
object, which means that it won't be sent to Elasticsearch.
The final part is to execute Transformer pipeline by running the following command:
transporter run
And voila! Check now Elasticsearch node to verify that the data got copied, with our transformation:
curl -XGET localhost:9200/test/restaurants/_search?pretty=true
// output
{
"took" : 4,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 25359,
"max_score" : 1.0,
"hits" : [
{
"_index" : "test",
"_type" : "restaurants",
"_id" : "5a3d8711531657e36b043eb5",
"_score" : 1.0,
"_source" : {
"address" : {
"building" : "265-15",
"coord" : [
-73.7032601,
40.7386417
],
"street" : "Hillside Avenue",
"zipcode" : "11004"
},
"borough" : "Queens",
"cuisine" : "Ice Cream, Gelato, Yogurt, Ices",
"name" : "Carvel Ice Cream",
"restaurant_id" : "40361322"
}
},
{
"_index" : "test",
"_type" : "restaurants",
"_id" : "5a3d8711531657e36b043ebc",
"_score" : 1.0,
"_source" : {
"address" : {
"building" : "502",
"coord" : [
-73.976112,
40.786714
],
"street" : "Amsterdam Avenue",
"zipcode" : "10024"
},
"borough" : "Manhattan",
"cuisine" : "Chicken",
"name" : "Harriet'S Kitchen",
"restaurant_id" : "40362098"
}
},
{
"_index" : "test",
"_type" : "restaurants",
"_id" : "5a3d8711531657e36b043ec6",
"_score" : 1.0,
"_source" : {
"address" : {
"building" : "7615",
"coord" : [
-74.0228449,
40.6281815
],
"street" : "5 Avenue",
"zipcode" : "11209"
},
"borough" : "Brooklyn",
"cuisine" : "American",
"name" : "Mejlander & Mulgannon",
"restaurant_id" : "40363117"
}
},
...
Notice that the grades object does not exist on Elasticsearch! - our transformation worked. Of course, you can apply much more complex transformations in the same way. Also, you can chain multiple transformations in the pipeline.