Subscriptions

GraphQL subscriptions are a way to push data from a server to clients requesting real-time messages from a server. Subscriptions are similar to queries in that they specify a set of fields to be delivered to a client, but instead of immediately returning a single answer a result is sent every time a particular event happens on a server.

In order to execute subscriptions in Juniper, we need a coordinator (spawning long-lived connections) and a GraphQL object with fields resolving into a Stream of elements which will then be returned to a client. The juniper_subscriptions crate provides a default implementation of these abstractions.

The subscription root is just a GraphQL object, similar to the query root and mutations root that we define for operations in our GraphQL schema. For subscriptions all fields should be async and return a Stream of some GraphQL type values, rather than direct values.

extern crate futures;
extern crate juniper;
use std::pin::Pin;
use futures::Stream;
use juniper::{graphql_object, graphql_subscription, FieldError};

#[derive(Clone)]
pub struct Database;

impl juniper::Context for Database {}

pub struct Query;

#[graphql_object]
#[graphql(context = Database)]
impl Query {
   fn hello_world() -> &'static str {
       "Hello World!"
   }
}

type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send>>;

pub struct Subscription;

#[graphql_subscription]
#[graphql(context = Database)]
impl Subscription {
    // This subscription operation emits two values sequentially:
    // the `String`s "Hello" and "World!".
    async fn hello_world() -> StringStream {
        let stream = futures::stream::iter([
            Ok(String::from("Hello")),
            Ok(String::from("World!")),
        ]);
        Box::pin(stream)
    }
}

fn main () {}

Coordinator

GraphQL subscriptions require a bit more resources than regular queries and provide a great vector for DoS attacks. This can can bring down a server easily if not handled correctly. The SubscriptionCoordinator trait provides coordination logic to enable functionality like DoS attacks mitigation and resource limits.

The SubscriptionCoordinator contains the schema and can keep track of opened connections, handle subscription start and end, and maintain a global ID for each subscription. Each time a connection is established, the SubscriptionCoordinator spawns a [32], which handles a single connection, providing resolver logic for a client stream as well as reconnection and shutdown logic.

While we can implement SubscriptionCoordinator ourselves, Juniper contains a simple and generic implementation called Coordinator. The subscribe method returns a Future resolving into a Result<Connection, GraphQLError>, where Connection is a Stream of values returned by the operation, and a GraphQLError is the error when the subscription operation fails.

extern crate futures;
extern crate juniper;
extern crate juniper_subscriptions;
extern crate serde_json;
use std::pin::Pin;
use futures::{Stream, StreamExt as _};
use juniper::{
    http::GraphQLRequest,
    graphql_object, graphql_subscription, 
    DefaultScalarValue, EmptyMutation, FieldError, 
    RootNode, SubscriptionCoordinator,
};
use juniper_subscriptions::Coordinator;

#[derive(Clone)]
pub struct Database;

impl juniper::Context for Database {}

impl Database {
    fn new() -> Self {
        Self
    }
}

pub struct Query;

#[graphql_object]
#[graphql(context = Database)]
impl Query {
    fn hello_world() -> &'static str {
        "Hello World!"
    }
}

type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send>>;

pub struct Subscription;

#[graphql_subscription]
#[graphql(context = Database)]
impl Subscription {
    async fn hello_world() -> StringStream {
        let stream = futures::stream::iter([
            Ok(String::from("Hello")), 
            Ok(String::from("World!")),
        ]);
        Box::pin(stream)
    }
}

type Schema = RootNode<'static, Query, EmptyMutation<Database>, Subscription>;

fn schema() -> Schema {
    Schema::new(Query, EmptyMutation::new(), Subscription)
}

async fn run_subscription() {
    let schema = schema();
    let coordinator = Coordinator::new(schema);
    let db = Database::new();

    let req: GraphQLRequest<DefaultScalarValue> = serde_json::from_str(
        r#"{
            "query": "subscription { helloWorld }"
        }"#,
    ).unwrap();
    
    let mut conn = coordinator.subscribe(&req, &db).await.unwrap();
    while let Some(result) = conn.next().await {
        println!("{}", serde_json::to_string(&result).unwrap());
    }
}

fn main() {}

WebSocket

For information about serving GraphQL subscriptions over WebSocket, see the "Serving" chapter.