Writing a Connector

Introduction

To use the Curiosity APIs from C#, you need to install the Curiosity.Library package to your project.

dotnet add package Curiosity.Library

For Python, use the Curiosity package from pypi:

pip install curiosity

You can download a template project directly from your workspace. You'll also need to generate an API token in the API integrations page. This token should be kept secret, as it has full access to your data.

The following code snippet shows how to create a graph object that can then be used to perform data operations. You can check the available methods in the API overview section.

using Curiosity.Library;
using Microsoft.Extensions.Logging;

var token = Environment.GetEnvironmentVariable("CURIOSITY_API_TOKEN");
var url   = "http://your-workspace-address";

if (string.IsNullOrEmpty(token))
{
    PrintHelp();
    return -1;
}

var loggerFactory = LoggerFactory.Create(log => log.AddConsole());
var logger        = loggerFactory.CreateLogger("My Connector");

using (var graph = Graph.Connect(url, token, "My Connector")
                        .WithLoggingFactory(loggerFactory))
{
    try
    {
        await graph.LogAsync("Starting connector");
        await CreateSchemaAsync(graph);
        await IngestDataAsync(graph);
        await graph.CommitPendingAsync();
        await graph.LogAsync("Finished connector");
    }
    catch(Exception E)
    {
        await graph.LogErrorAsync(E.ToString());
        throw;
    }
}

async Task CreateSchemaAsync(Graph graph)
{
    // Create your schemas in the graph here
}

async Task IngestDataAsync(Graph graph)
{
    // Your data connector logic goes here
}

void PrintHelp() => Console.WriteLine("Missing API token, you can set it using the CURIOSITY_API_TOKEN environment variable.");

Creating your schema using the connector

You can define the schema of your graph directly in the data connector. Alternatively you can do the same from the user interface if you prefer, and omit this step in your data connector.

When you create a schema from the connector, it will never delete your existing data in case of conflicts due to non-compatible changes. You'll need to first manually delete the previous schema and then run the connector again. As long as the changes are compatible, you can add new fields to the schema, and they'll automatically be created next time you run, with default values for any existing data.

async Task CreateSchemaAsync(Graph graph)
{
    await graph.CreateNodeSchemaAsync<Book>();
    await graph.CreateNodeSchemaAsync<Author>();
    await graph.CreateNodeSchemaAsync<Customer>();
    await graph.CreateNodeSchemaAsync<Order>();
}

[Node]
public class Book
{
    [Key] public string Id    { get; set; }
    [Property] public string Title { get; set; }
    [Property] public string Genre { get; set; }
    [Property] public float Price  { get; set; }
    [Property] public float Rating { get; set; }
}

[Node]
public class Author
{
    [Key]      public string Id      { get; set; }
    [Property] public string Name    { get; set; }
    [Property] public string Website { get; set; }    
}

[Node]
public class Customer
{
    [Key]      public string Id      { get; set; }
    [Property] public string Name    { get; set; }
    [Property] public string Address { get; set; }
    [Property] public string Email   { get; set; }
    [Property] public string Phone   { get; set; }
}

[Node]
public class Order
{
    [Key]       public string Id           { get; set; }
    [Timestamp] public DateTimeOffset Date { get; set; }
    [Property]  public string Status       { get; set; }
    [Property]  public float  Total        { get; set; }
}

When working with edges, it is useful to structure your edge types with the help of a static class. This helps you avoid having strings for edge types everywhere, and helps with refactoring, so if you want to rename an edge type, all places in your connector will be automatically renamed.

async Task CreateSchemaAsync(Graph graph)
{
    //... create node schemas as above ...
    await graph.CreateEdgeSchemaAsync(Edges.HasAuthor, Edges.AuthorOf,
                                      Edges.HasOrder,  Edges.OrderOf,
                                      Edges.HasReview, Edges.ReviewOf,
                                      Edges.ReviewBy,  Edges.WroteReview);
}

public static class Edges
{
    public const string HasAuthor   = nameof(HasAuthor);
    public const string AuthorOf    = nameof(AuthorOf);
    public const string HasOrder    = nameof(HasOrder);
    public const string OrderOf     = nameof(OrderOf);
    public const string HasReview   = nameof(HasReview);
    public const string ReviewOf    = nameof(ReviewOf);
    public const string ReviewBy    = nameof(ReviewBy);
    public const string WroteReview = nameof(WroteReview);
}

If you create your edges directly in the Schema interface, you can also download such a helper class inside the API Integration template.

Ingesting data using the connector

There are two main steps when ingesting data into your workspace, creating the nodes and adding the relationships in the graph. The following code shows how you can add two distinct node types and link them together in the graph:

async Task IngestDataAsync(Graph graph)
{
    var booksPerAuthor = await FetchBooksAsync();
    
    foreach(var (author, books) in booksPerAuthor)
    {
        var authorNode = graph.AddOrUpdate(author);
        
        foreach(var book in books)
        {
            var bookNode = graph.AddOrUpdate(book);
            graph.Link(authorNode, bookNode, Edges.AuthorOf, Edges.HasAuthor);
        }
    }
}

async Task<Dictionary<Author, Book[]>> FetchBooksAsync()
{
    // ... fetch books and authors from your data source ...
}

For more information on available methods, please check the API overview section.

Last updated