Streaming and Trigger Support With GQLAlchemy

Streaming and Trigger Support With GQLAlchemy

·

5 min read

Working with a new technology stack is never an easy task, especially when you are part of the Python ecosystem. You get accustomed to many libraries that make your life easier on a daily basis. That's why we decided to add some missing functionalities to our Object Graph Mapper (OGM) GQLAlchemy.

From now on, Python developers won’t have to create and manage data streams and database triggers from Memgraph directly with the Cypher query language but can instead use the GQLAlchemy library to accomplish these tasks programmatically.

Let's see the new functionalities in action!

Prerequisites

1. If you want to start a dummy Kafka stream and actually connect to it with Memgraph, then clone the project data-streams and run the following command:

python start.py --platform kafka --dataset movielens

2. From the same directory data-stream run the following command to start Memgraph:

docker-compose up memgraph-mage

3. Now, we can connect to Memgraph using GQLAlchemy in a Python script:

from gqlalchemy import Memgraph, Node, Field

memgraph = Memgraph()

class User(Node):
    name: str = Field(index=True, exists=True, unique=True, db=memgraph)

user = User(name='Ron Swanson').save(memgraph)
print(user)

What happened? Well, we just created a node with the label User in the database and fetched it to our program. Now it's time to dive into streams and triggers!

Connecting to a data stream from GQLAlchemy

The stream functionality enables Memgraph to connect to a Kafka, Pulsar or Redpanda cluster and run graph analytics on the data stream.

1. Create a stream in Memgraph

This step is pretty easy. You just have to call the create_stream() method with the parameters for the specific stream:

from gqlalchemy import Memgraph, MemgraphKafkaStream, match

memgraph = Memgraph()

stream = MemgraphKafkaStream(
    name="ratings_stream",
    topics=["ratings"],
    transform="movielens.rating",
    bootstrap_servers="'kafka:9092'",
)
memgraph.create_stream(stream)

2. Start the stream

To start the stream, just call the start_stream() method:

memgraph.start_stream(stream)

Now, let's check if data is being ingested by Memgraph:

movies = match().node(variable="m", labels="Movie").return_().limit(5).execute()
print(list(movies))

Hopefully, you just printed out a bunch of movie titles. If not, it means that there is an error somewhere. You can always ask for help on our Discord Server.

3. Check the status of the stream

To check the status of the stream in Memgraph, just run the following command:

streams = memgraph.get_streams()
print(streams)

4. Delete the stream

You can use the drop_stream() method to delete a stream:

memgraph.drop_stream(stream)

Creating database triggers in GQLAlchemy

Because Memgraph supports database triggers on CREATE, UPDATE and DELETE operations, GQLAlchemy also implements a simple interface for maintaining these triggers.

Why do you need database triggers for graph analytics? Imagine a graph that is being updated continuously with new data. Maybe you need to inform another service of each change or run a graph algorithm after particular changes take effect. Triggers make it possible to create custom notifications, and if you write your own query module, you can execute whatever code you want once the trigger fires. You could be sending data to a Kafka cluster, calling a remote API, saving the information to another system, etc.

1. Create the trigger

To set up the trigger, first, create a MemgraphTrigger object with all the required arguments:

  • name: str ➡ The name of the trigger.

  • event_type: TriggerEventType ➡ The type of event that will trigger the execution. The options are: TriggerEventType.CREATE, TriggerEventType.UPDATE and TriggerEventType.DELETE.

  • event_object: TriggerEventObject ➡ The objects that are affected with the event_type. The options are: ``TriggerEventObject.ALL, TriggerEventObject.NODE and TriggerEventObject.RELATIONSHIP.

  • execution_phase: TriggerExecutionPhase ➡ The phase when the trigger should be executed in regard to the transaction commit. The options are: BEFORE and AFTER.

  • statement: str ➡ The Cypher query that should be executed when the trigger fires.

Now, let's create a trigger in GQLAlchemy:

from gqlalchemy import Memgraph, MemgraphTrigger
from gqlalchemy.models import (
    TriggerEventType,
    TriggerEventObject,
    TriggerExecutionPhase,
)

memgraph = Memgraph()

trigger = MemgraphTrigger(
    name="ratings_trigger",
    event_type=TriggerEventType.CREATE,
    event_object=TriggerEventObject.NODE,
    execution_phase=TriggerExecutionPhase.AFTER,
    statement="UNWIND createdVertices AS node SET node.created_at = LocalDateTime()",
)

memgraph.create_trigger(trigger)

The trigger names ratings_trigger will be executed every time a node is created in the database. After the transaction that created the node in question finishes, the Cypher query statement will execute, and in this case, it will set the property created_at of the newly created node to the current date and time.

2. Check the status of a trigger

You can return all of the triggers from the database with the get_Triggers() method:

triggers = memgraph.get_triggers()
print(triggers)

3. Delete the trigger

You can use the drop_trigger() method to delete a trigger:

memgraph.drop_trigger(trigger)

Conclusion

As you can see, it's not particularly hard to work with streams and triggers in Memgraph, and GQLAlchemy only makes the job easier. While some options were covered in this article, there are lots more available. Not only can you connect to Pulsar and Redpanda streams as well, but through the use of query modules, you can write custom procedures in Python to additionally analyze the incoming data or pass on the results. The same goes for triggers where you are not limited to defining a Cypher query that should be executed but rather writing custom procedures that are only limited by your knowledge of Python.

If you found this short tutorial interesting, don't forget to check out the GQLAlchemy project on GitHub and throw us a star.

Read more about real time analytics on memgraph.com