Data importing into Neo4j graph database with Apoc

Recently been using Neo4j as a light-weight analytics reporting tool for a micro-services platform.  Here you can find some alternative options to import data using the plugins from the community driven Apoc libraries.  Including hiding connection credentials and wrapping large data loads in periodic commits for performance tuning.

Before investing heavily in a more costly data warehouse or data lake solution we wanted to gather data analytics intelligence with minimal investment.  The idea was to hydrate and join data from approx 20 sources on a regular basis into an embedded Neo4j store (bundled into another micro-service) and then generate cross cutting reports and simple dashboards.  It was a ‘tactical analytics’ solution on an extreme budget.

Our data came from a mix of micro-service databases and API based sources.  Neo4j importing options out the box require either data import to happen before you start the database or for csv files to be available (using the LOAD CSV command).  In this blog we will use the Apoc procedures to dynamically load directly from SQL, with the same principles usable for API calls.

Follow along using a clean install of Neo4j Community Edition and the excellent Neo4j Browser interface.  While the blog assumes a basic knowledge it should not take someone too long to get running and familiar with Neo4j even if you’ve never used it before.  See the links at the bottom for other downloads and reference material.

Step 1 – add Apoc and Database Drivers to your Neo4j instance

Add Apoc jar to your Neo4j server plugins folder.  Make sure you download the Apoc jar version that bundles all dependencies, and not just the standard apoc jar.  Included in the plugins folder must be any drivers for databases you intend to query.

apoc-3.3.0.1-all.jar
h2-1.4.196.jar

When running Neo4j in embedded mode you can simply add the maven dependency to your pom.xml

<dependency>
    <groupId>org.neo4j.procedure</groupId>
    <artifactId>apoc</artifactId>
    <version>3.3.0.1</version>
</dependency
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <version>1.4.196</version>
</dependency>

Here we will use the H2 library for test purposes.  Ironically to load from CSV using SQL for illustration purposes, so we don’t need to populate a real database to verify JDBC SQL is working.

Before we can load data from any JDBC source we need to load the JDBC driver using the following cypher statement (replacing the driver classname as appropriate)…

CALL apoc.load.driver("org.h2.Driver")

Step 2 – Load data from JDBC source

We’ll start with simple apoc.load.jdbc call to load some data via JDBC SQL and map into nodes and properties.  You can first test the SQL and see the result data on its own before adding any processing…

CALL apoc.load.jdbc('jdbc:h2:mem:test;DB_CLOSE_DELAY=-1',"
  select * from CSVREAD('~/Downloads/SampleCSVFile_2kb.csv') 
") YIELD row

The apoc.load.jdbc procedure simply takes a JDBC URL and a query statement to execute.  The ‘YIELD row’ after the procedure will stream each row of data from the SQL, making the columns accessible via the ‘row’ cypher variable.  So a complete load from JDBC and graph hydration would look like this…

CALL apoc.load.jdbc('jdbc:h2:mem:test;DB_CLOSE_DELAY=-1',"
  select * from CSVREAD('~/Downloads/SampleCSVFile_2kb.csv')
") YIELD row
MERGE (p:Product {productId: row.PRODUCT})
SET p.price = row.PRICE
MERGE (u:User {name: row.FULLNAME})
MERGE (u)-[:SUPPLIES]->(p)

Tip:  Use of double and single quotes allows you to quote the SQL as a cypher string parameter while containing quoted data in the SQL itself.

Step 3 – Hiding the database URL using global static variables

It’s not ideal to have your fully qualified JDBC URL exposed in every call to load data.  Fortunately we can use the apoc.static feature to store and reference this.

Before we can store global variables we must allow apoc procedures unrestricted access to internal Neo4j database server capabilities.  We do this by modifying the neo4j.conf file with the following…

dbms.security.procedures.unrestricted=apoc.*

Now we can pre-load the JDBC URL into a global variable in the Neo4j server (don’t be surprised when the result is ‘null’, this means it actually worked)…

call apoc.static.set('TEST_H2', 'jdbc:h2:mem:test;DB_CLOSE_DELAY=-1')
and check you can get the value thereafter…
call apoc.static.get('TEST_H2')
Now we can modify the load command to reference the static URL value on the server, rather than coding it in every time…
CALL apoc.static.get('TEST_H2') yield value WITH apoc.convert.toString(value) AS DB_URL
CALL apoc.load.jdbc(DB_URL,"
   select * from CSVREAD('~/Downloads/SampleCSVFile_2kb.csv')
") YIELD row
MERGE (p:Product {productId: row.PRODUCT})
SET p.price = row.PRICE
MERGE (u:User {name: row.FULLNAME})
MERGE (u)-[:SUPPLIES]->(p)

Tip: we use the apoc.convert.toString(value) before assigning to a variable for later use.  This is required to avoid ambiguous type errors.

Step 4 – Wrapping it all in Periodic Commits for Better Performance

Finally you may find larger datasets taking a long time to import.  Without support from the native PERIODIC COMMIT to batch the import data stream into performant chunks we instead need to fall back on the apoc.periodic.iterate procedure.

This is where is gets a little messy but here is the same import with periodic commits…

CALL apoc.periodic.iterate("

  CALL apoc.static.get('TEST_H2') yield value WITH apoc.convert.toString(value) AS DB_URL
  CALL apoc.load.jdbc(DB_URL,\"
    select * from CSVREAD('~/Downloads/SampleCSVFile_2kb.csv')
  \") YIELD row RETURN row

","

  MERGE (p:Product {productId: row.PRODUCT})
  SET p.price = row.PRICE
  MERGE (u:User {name: row.FULLNAME})
  MERGE (u)-[:SUPPLIES]->(p)

", {batchSize:1000, parallel:false})
  • one cypher statement to return a stream of data
    • Notice the first half of our previous cypher is now provided as a string, further complicating the use of quotes, meaning the quotes for the SQL string must be escaped!
    • Secondly you will need to finish the statement with a RETURN, since this must be a returnable stream of data now
  • another cypher statement to process/merge that data
    • Our merge statement remains unchanged, but the issue with quotes is still relevant
  • batchSize setting to define how often to periodically commit the data
    • Defining the chunks of rows that will be processed between each commit
    • Notice this relates to ‘rows’ and not ‘nodes’ so be careful to consider how many nodes and relationships per/row will be created within each batch transaction!
  • parallel setting to define whether to process blocks of data concurrently
    • Leaving this as false since I was unable to get reliable results on community edition with this set to true

Conclusion

Neo4j standard data import capabilities are somewhat limited for dynamic graph hydration.  The routines in the community led Apoc procedures give you a lot more options.  While working with them can become a Java developers string concatenation nightmare, it is possible to abstract that away and achieve acceptable results.

There are other loading options available, apoc-load.xml and apoc.load.json procedures, you can swap out apoc.load.jdbc and use the same techniques with.  Keeping credentials hidden and using periodic commit can make the difference when evaluating Neo4j for a secondary data store solutions.

Hopefully Neo4j will improve native import capabilities and simplify the cypher syntax needed to perform such operations.  Keep an eye on their Neo4j Graph Platform and its ETL data integration tooling, that might yield the next generation of import capabilities.

It's only fair to share...Tweet about this on Twitter
Twitter
Share on LinkedIn
Linkedin
Email this to someone
email