How to document a Socket.IO API?

Dimitrios Dedoussis

AsyncAPI Conference 2021

Agenda

  • Intro
  • Story from RL
  • What is Socket.IO?
  • Modelling the Socket.IO protocol using AsyncAPI
  • In practice
  • Asynction
  • Q&A

About me

Asynction

How it all started

What is Socket.IO? 🤔

Features over plain WebSockets

  • reliability
  • automatic reconnection
  • packet buffering
  • acknowledgements
  • broadcasting
  • multiplexing

The Socket class

interface Socket {
  on(eventName: string, callback: Callback): void
  emit(eventName: string, ...args: any, ack: Callback): void
}
interface Socket {
  on(eventName: string, callback: Callback): void
  emit(eventName: string, ...args: any, ack: Callback): void
}
  • Registers a new handler (callback) for a given event
  • Implements a subscribing operation
  • The return value of the callback is sent to the sender party as an acknowledgement
interface Socket {
  on(eventName: string, callback: Callback): void
  emit(eventName: string, ...args: any, ack: Callback): void
}
  • Emits an event to the receiver party
  • Implements a publishing operation
  • The ack callback is invoked only if the receiver returns an acknowledgment

Namespace

  • Enables multiplexing capabilities
  • Has its own handlers (and potentially its own dedicated connection)
  • / is the default -- Can set up multiple namespaces

Why document a Socket.IO API?

const io = require("socket.io")(3000);

io.on("connection", socket => {
  socket.emit("Hello!");
  socket.emit("greetings", "Hey!", { "ms": "jane" }, Buffer.from([4, 3, 3, 1]));

  socket.on("message", (data) => {
    console.log(data);
  });

  socket.on("salutations", (elem1, elem2, elem3) => {
    console.log(elem1, elem2, elem3);
  });
});

Not interested in

Modelling the Socket.IO protocol using AsyncAPI

Mapping AsyncAPI objects to semantics of the Socket.IO client API

AsyncAPI Object

This is the root document object

  • asyncapi: Spec version being used
  • channels: The available channels and messages for the API
  • servers: Provides connection details of servers
  • components: An element to hold various schemas

Channels

channels:
  /: {} # Channel Item Object
  /admin: {} # Channel Item Object
  • Addressable components where messages/events flow through
  • Enable the application to separate its concerns
  • Sounds very similar to a Socket.IO namespace ✅

Channel Item Object

channels:
  /:
    publish: {} # Operation object - Ignore this for now
    subscribe: {} # Operation object - Ignore this for now
    bindings:
      ws:
        query:
          type: object
          properties:
            token:
              type: string
          required: [token]

Bindings are protocol specific information for the server.


const socket = io("wss://example.com", {
  query: {token: "admin"}
});
        

Conventions for bindings

  • Include bindings under the main namespace but omit them under the custom ones
  • If a custom namespace includes bindings, then the client should always force a new connection when connecting to it
  • WebSockets Channel Binding is the only possible binding for Socket.IO APIs

In an ideal 🌎

Operations

Document how and why messages are sent/received...

publish ➡️ socket.emit

subscribe ➡️ socket.on

Message

channels:
  /:
    publish:  # socket.emit(eventName[, …args][, ack])
      message:
        name: hello  # eventName
        payload:  # args
          type: object
          properties:
            foo:
              type: string
    subscribe:  # socket.on(eventName, (...args) => {})
      message:
        name: hello  # eventName
        payload:  # args
          type: object
          properties:
            foo:
              type: string

Multiple messages


channels:
  /admin:
    publish:
      message:
        oneOf:
          - $ref: "#/components/messages/MessageOne"
          - $ref: "#/components/messages/MessageTwo"

ACK semantics

Specification Extension ⚠️

  • Message Objects MAY include the x-ack field. The value of this field SHOULD be a Message Ack Object.
  • Components Object MAY include the x-messageAcks field.

Message Ack Object

Field name Type Description
args Schema Object Schema of the arguments that are passed as input to the acknowledgement callback function
	
channels:
  /:
    publish:  # socket.emit(eventName[, …args][, ack])
      message:
        name: hello  # eventName
        payload:  # args
          type: object
          properties:
            foo:
              type: string
        x-ack:
          args:
            type: integer
    subscribe:  # socket.on(eventName, (...args) => {})
      message:
        name: hello  # eventName
        payload:  # args
          type: object
          properties:
            foo:
              type: string
        x-ack:
          args:
            type: integer
        

Servers

servers:
  production:
    url: example.com/apis/socket.io
    protocol: wss
          

            const socket = io("wss://example.com/my-namespace", {
              path: "/apis/socket.io" // defaults to "/socket.io"
            });
          

Summary

Minimal chat application

https://socket.io/demos/chat
// Setup basic express server
const express = require("express");
const app = express();
const path = require("path");
const server = require("http").createServer(app);
const io = require("socket.io")(server);
const port = process.env.PORT || 3000;

server.listen(port, () => {
  console.log("Server listening at port %d", port);
});

// Chatroom
let numUsers = 0;

io.on("connection", (socket) => {
  let addedUser = false;

  // when the client emits 'new message', this listens and executes
  socket.on("new message", (data) => {
    // we tell the client to execute 'new message'
    socket.broadcast.emit("new message", {
      username: socket.username,
      message: data,
    });
  });

  // when the client emits 'add user', this listens and executes
  socket.on("add user", (username, cb) => {
    if (addedUser) {
      cb({ error: "User is already added" });
      return;
    }

    // we store the username in the socket session for this client
    socket.username = username;
    ++numUsers;
    addedUser = true;
    socket.emit("login", {
      numUsers: numUsers,
    });
    // echo globally (all clients) that a person has connected
    socket.broadcast.emit("user joined", {
      username: socket.username,
      numUsers: numUsers,
    });
    cb({ error: null });
  });

  // when the client emits 'typing', we broadcast it to others
  socket.on("typing", () => {
    socket.broadcast.emit("typing", {
      username: socket.username,
    });
  });

  // when the client emits 'stop typing', we broadcast it to others
  socket.on("stop typing", () => {
    socket.broadcast.emit("stop typing", {
      username: socket.username,
    });
  });

  // when the user disconnects.. perform this
  socket.on("disconnect", () => {
    if (addedUser) {
      --numUsers;

      // echo globally that this client has left
      socket.broadcast.emit("user left", {
        username: socket.username,
        numUsers: numUsers,
      });
    }
  });
});

// Admin

io.of("/admin").on("connection", (socket) => {
  let token = socket.handshake.query.token;
  if (token !== "admin") socket.disconnect();

  socket.emit("server metric", {
    name: "CPU_COUNT",
    value: require("os").cpus().length,
  });
});

First things first


asyncapi: 2.2.0

info:
  title: Socket.IO chat service
  version: 1.0.0
  description: |
    This is one of the get-started demos listed in the SocketIO website: https://socket.io/demos/chat/

servers:
  demo:
    url: socketio-chat-h9jt.herokuapp.com/socket.io
    protocol: wss
          

Channels


const io = require("socket.io")(server);

io.on("connection", (socket) => {
  // handlers
});

io.of("/admin").on("connection", (socket) => {
  // handlers
});
            

channels:
  /: {}
  /admin: {}
            
const io = require("socket.io");

io.on("connection", (socket) => {

  socket.on("new message", (data) => {
    socket.broadcast.emit("new message", {
      username: socket.username,
      message: data,
    });
  });

  socket.on("add user", (username, cb) => {
    socket.broadcast.emit("user joined", {
      username: socket.username,
      numUsers: numUsers,
    });
    cb({ error: null });
  });

  socket.on("typing", () => {
    socket.broadcast.emit("typing", {
      username: socket.username,
    });
  });

  socket.on("stop typing", () => {
    socket.broadcast.emit("stop typing", {
      username: socket.username,
    });
  });

  socket.on("disconnect", () => {
    socket.broadcast.emit("user left", {
      username: socket.username,
      numUsers: numUsers,
    });
  });
});
            
channels:
  /:
    publish:
      message:
        oneOf:
          - $ref: "#/components/messages/NewMessage"
          - $ref: "#/components/messages/Typing"
          - $ref: "#/components/messages/StopTyping"
          - $ref: "#/components/messages/AddUser"
    subscribe:
      message:
        oneOf:
          - $ref: "#/components/messages/NewMessageReceived"
          - $ref: "#/components/messages/UserTyping"
          - $ref: "#/components/messages/UserStopTyping"
          - $ref: "#/components/messages/UserJoined"
          - $ref: "#/components/messages/UserLeft"
          - $ref: "#/components/messages/LogIn"
  /admin:
    subscribe:
      message: # No need to use `oneOf` since there is only a single event
        $ref: "#/components/messages/ServerMetric"
            
          

Channel bindings

               
io.of("/admin").on("connection", (socket) => {
  let token = socket.handshake.query.token;
  if (token !== "admin") socket.disconnect();

  socket.emit("server metric", {
    name: "CPU_COUNT",
    value: require("os").cpus().length,
  });
});
               
             
               
channels:
  /admin:
    subscribe:
      # ...
    bindings:
      ws:
        query:
          type: object
          properties:
            token:
              type: string
          required: [token]
               
             

Messages


socket.on(
  "add user", 
  (username, cb) => {
    if (addedUser) {
      cb({ error: "User is already added" });
      return;
    }

    ++numUsers;
    
    socket.broadcast.emit(
      "user joined", 
      {
        username: username,
        numUsers: numUsers,
      }
    );

    cb({ error: null });
  }
);
          
            
components:
  messages:
    AddUser:
      name: add user
      payload:
        type: string
      x-ack: # Documents that this event is always acknowledged by the receiver
        args:
          type: object
          properties:
            error:
              type: [string, "null"]
    UserJoined:
      name: user joined
      payload:
        type: object
        properties:
          username:
            type: string
          numUsers:
            type: integer
            
          

Putting everything together

github://dedoussis/asyncapi-socket.io-example
            
asyncapi: 2.2.0

info:
  title: Socket.IO chat demo service
  version: 1.0.0
  description: |
    This is one of the get-started demos presented in the socket.io website: https://socket.io/demos/chat/

servers:
  demo:
    url: socketio-chat-h9jt.herokuapp.com/socket.io
    protocol: wss

channels:
  /:
    publish:
      message:
        oneOf:
          - $ref: "#/components/messages/NewMessage"
          - $ref: "#/components/messages/Typing"
          - $ref: "#/components/messages/StopTyping"
          - $ref: "#/components/messages/AddUser"
    subscribe:
      message:
        oneOf:
          - $ref: "#/components/messages/NewMessageReceived"
          - $ref: "#/components/messages/UserTyping"
          - $ref: "#/components/messages/UserStopTyping"
          - $ref: "#/components/messages/UserJoined"
          - $ref: "#/components/messages/UserLeft"
          - $ref: "#/components/messages/LogIn"
  /admin:
    subscribe:
      message: # No need to use `oneOf` since there is only a single event
        $ref: "#/components/messages/ServerMetric"
    bindings:
      $ref: "#/components/channelBindings/AuthenticatedWsBindings"

components:
  messages:
    NewMessage:
      name: new message
      payload:
        type: string
    Typing:
      name: typing
    StopTyping:
      name: stop typing
    AddUser:
      name: add user
      payload:
        type: string
      x-ack: # Documents that this event is always acknowledged by the receiver
        args:
          type: object
          properties:
            error:
              type: [string, "null"]
    NewMessageReceived:
      name: new message
      payload:
        type: object
        properties:
          username:
            type: string
          message:
            type: string
    UserTyping:
      name: typing
      payload:
        type: object
        properties:
          username:
            type: string
    UserStopTyping:
      name: stop typing
      payload:
        type: object
        properties:
          username:
            type: string
    UserJoined:
      name: user joined
      payload:
        type: object
        properties:
          username:
            type: string
          numUsers:
            type: integer
    UserLeft:
      name: user left
      payload:
        type: object
        properties:
          username:
            type: string
          numUsers:
            type: integer
    LogIn:
      name: login
      payload:
        type: object
        properties:
          numUsers:
            type: integer
    ServerMetric:
      name: server metric
      payload:
        type: object
        properties:
          name:
            type: string
          value:
            type: number

  channelBindings:
    AuthenticatedWsBindings:
      ws:
        query:
          type: object
          properties:
            token:
              type: string
          required: [token]
              
            

AsyncAPI playground

Asynction ✨

https://github.com/dedoussis/asynction

Guarantees that your API will work in accordance with its documentation

Features

  • Registers all event and error handlers that are referenced within the API spec
  • Provides out of the box validation on every Socket.IO interraction
  • Generates HTML rendered docs, similar to the AsyncAPI playground
  • Mock server support
  • CLI
  • Authentication (coming soon)

Inspired by Connexion

              
openapi: 3.0.0
paths:
  /greeting/{name}:
    post:
      summary: Generate greeting
      operationId: hello.post_greeting
      responses:
        200:
          description: greeting response
          content:
            text/plain:
              schema:
                type: string
      parameters:
        - name: name
          in: path
          required: true
          schema:
            type: string
              
            
              
  import connexion
  
  def post_greeting(name: str) -> str:
      return f'Hello {name}'
  
  app = connexion.FlaskApp(
      "app", 
      specification_dir='openapi/'
  )
  app.add_api('helloworld-api.yaml')
  app.run()
              
            
            
asyncapi: 2.2.0

info:
  title: Socket.IO chat demo service
  version: 1.0.0
  description: |
    This is one of the get-started demos presented in the socket.io website: https://socket.io/demos/chat/
servers:
  demo:
    url: socketio-chat-h9jt.herokuapp.com/socket.io
    protocol: wss

channels:
  /:
    publish:
      message:
        oneOf:
          - $ref: "#/components/messages/NewMessage"
          - $ref: "#/components/messages/Typing"
          - $ref: "#/components/messages/StopTyping"
          - $ref: "#/components/messages/AddUser"
    subscribe:
      message:
        oneOf:
          - $ref: "#/components/messages/NewMessageReceived"
          - $ref: "#/components/messages/UserTyping"
          - $ref: "#/components/messages/UserStopTyping"
          - $ref: "#/components/messages/UserJoined"
          - $ref: "#/components/messages/UserLeft"
          - $ref: "#/components/messages/LogIn"
    x-handlers:
      disconnect: app.disconnect
  /admin:
    subscribe:
      message: # No need to use `oneOf` since there is only a single event
        $ref: "#/components/messages/ServerMetric"
    bindings:
      $ref: "#/components/channelBindings/AuthenticatedWsBindings"
    x-handlers:
      connect: app.admin_connect

components:
  messages:
    NewMessage:
      name: new message
      x-handler: app.new_message
      payload:
        type: string
    Typing:
      name: typing
      x-handler: app.message_typing
    StopTyping:
      name: stop typing
      x-handler: app.stop_typing
    AddUser:
      name: add user
      payload:
        type: string
      x-handler: app.add_user
      x-ack: # Documents that this event is always acknowledged by the receiver
        args:
          type: object
          properties:
            error:
              type: [string, "null"]
    NewMessageReceived:
      name: new message
      payload:
        type: object
        properties:
          username:
            type: string
            format: first_name
          message:
            type: string
            format: sentence
        required: [username, message]
    UserTyping:
      name: typing
      payload:
        type: object
        properties:
          username:
            type: string
            format: first_name
        required: [username]
    UserStopTyping:
      name: stop typing
      payload:
        type: object
        properties:
          username:
            type: string
            format: first_name
        required: [username]
    UserJoined:
      name: user joined
      payload:
        type: object
        properties:
          username:
            type: string
            format: first_name
          numUsers:
            type: integer
        required: [username, numUsers]
    UserLeft:
      name: user left
      payload:
        type: object
        properties:
          username:
            type: string
            format: first_name
          numUsers:
            type: integer
        required: [username, numUsers]
    LogIn:
      name: login
      payload:
        type: object
        properties:
          numUsers:
            type: integer
        required: [numUsers]
    ServerMetric:
      name: server metric
      payload:
        type: object
        properties:
          name:
            type: string
          value:
            type: number
        required: [name, value]

  channelBindings:
    AuthenticatedWsBindings:
      ws:
        query:
          type: object
          properties:
            token:
              type: string
          required: [token]
            
          
            
from flask import Flask
from asynction import AsynctionSocketIO

flask_app = Flask(__name__)
asio = AsynctionSocketIO.from_spec(
    spec_path=Path.cwd() / "asyncapi.yml",
    server_name="demo",
    app=flask_app,
)

if __name__ == "__main__":
    asio.run(app=flask_app)
            
          
            
def new_message(message):
    sid = request.sid
    username = username_store[sid]
    emit(
        "new message",
        {"username": username, "message": message},
        broadcast=True,
        skip_sid=sid,
    )
            
          

Full example

github://dedoussis/asynction/tree/main/example

Mock server


$ brew tap dedoussis/tap
$ brew install asynction
          

$ asynction --spec ./asyncapi.yml mock run --port 5001
* Restarting with stat
* Debugger is active!
* Debugger PIN: 339-844-897
(71320) wsgi starting up on http://0.0.0.0:5001
...
          

Thank you! 🙇

Questions?