Sync data from MongoDb to ElasticSearch using Transporter

Recently, My team has been asked to implement a web crawler to index the content of a few websites and save those data into some database for further analysis. After a long discussion we decided to go with MongoDB and elasticsearch. Elasticsearch facilitates full text search of the data, while MongoDB excels at storing it. Using MongoDB to store our data and Elasticsearch for search seems to be a solid choice.

In this post, I'll show how to use the open-source utility Transporter to quickly copy data from MongoDB to Elasticsearch with custom transformations.

Build Transporter

Well, I'm not going to go deep in details on this particular part since the Transporter documentation explains how to achieve this for both *nix and windows systems. You can also use the Vagrant VM provisioned by Ansible.

Elasticsearch and Mongo

I'll be using Docker containers for this small example to set up Elasticsearch and MongoDB nodes, as shown in the below Docker-compose.yml file:

mongodb:  
  image: mongo
  ports:
    - "27017:27017"

mongo-seed:  
  build: ./mongo-seed
  links:
    - mongodb

elasticsearch:  
  image: elasticsearch
  ports:
    - 9200:9200 
    - 9300:9300

The role of mongodb and elasticsearch services is obvious (I guess), while mongo-seed service aims to import data to our mongodb container. The below Dockerfile use the mongoimport command for seeding data, which it will create a mongo restaurants collection in the test database:

FROM mongo

COPY data.json /data.json  
CMD mongoimport --host mongodb --db test --collection restaurants --drop --file /data.json  

you can find the dataset in this link, you'll need to retrive it and save it in a data.json file.

Running the Transformation

Now that we are done with the setup, it's time to sync and transform our data. Make sure both Elasticsearch and mongodb are running!
Now run the following command:

transporter init mongodb elasticsearch

This generates a basic pipeline.js file in your current directory. A Transformer pipeline is a sequence of adaptors, which connect a source adaptor, any number of transformers and any number of sink adaptors to create a path for messages to flow down, all of this defined in the configuration section of pipeline.js.

After that, modify your generated pipeline.js to look like this:

// mongodb configuration
var source = mongodb({  
  "uri": "mongodb://127.0.0.1:27017/test"
})
// Elasticsearch configuration
var sink = elasticsearch({  
  "uri": "http://localhost:9200/test"
})
// Get data, transform it, Then save it
t.Source("source", source, "/.*/").Transform(omit({"fields":["grades"]})).Save("sink", sink, "/.*/")  

source and sink are 2 javascript objects that represents our nodes, then we configured the required parameter, uri which is used to locate the database for reading or writing. We also do a basic transformation by removing the grades object, which means that it won't be sent to Elasticsearch.

The final part is to execute Transformer pipeline by running the following command:

transporter run

And voila! Check now Elasticsearch node to verify that the data got copied, with our transformation:

curl -XGET localhost:9200/test/restaurants/_search?pretty=true  
// output
{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 25359,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "test",
        "_type" : "restaurants",
        "_id" : "5a3d8711531657e36b043eb5",
        "_score" : 1.0,
        "_source" : {
          "address" : {
            "building" : "265-15",
            "coord" : [
              -73.7032601,
              40.7386417
            ],
            "street" : "Hillside Avenue",
            "zipcode" : "11004"
          },
          "borough" : "Queens",
          "cuisine" : "Ice Cream, Gelato, Yogurt, Ices",
          "name" : "Carvel Ice Cream",
          "restaurant_id" : "40361322"
        }
      },
      {
        "_index" : "test",
        "_type" : "restaurants",
        "_id" : "5a3d8711531657e36b043ebc",
        "_score" : 1.0,
        "_source" : {
          "address" : {
            "building" : "502",
            "coord" : [
              -73.976112,
              40.786714
            ],
            "street" : "Amsterdam Avenue",
            "zipcode" : "10024"
          },
          "borough" : "Manhattan",
          "cuisine" : "Chicken",
          "name" : "Harriet'S Kitchen",
          "restaurant_id" : "40362098"
        }
      },
      {
        "_index" : "test",
        "_type" : "restaurants",
        "_id" : "5a3d8711531657e36b043ec6",
        "_score" : 1.0,
        "_source" : {
          "address" : {
            "building" : "7615",
            "coord" : [
              -74.0228449,
              40.6281815
            ],
            "street" : "5 Avenue",
            "zipcode" : "11209"
          },
          "borough" : "Brooklyn",
          "cuisine" : "American",
          "name" : "Mejlander & Mulgannon",
          "restaurant_id" : "40363117"
        }
      },
...

Notice that the grades object does not exist on Elasticsearch! - our transformation worked. Of course, you can apply much more complex transformations in the same way. Also, you can chain multiple transformations in the pipeline.