Skip to contents

The goal of SSEparser is to provide robust functionality to parse Server-Sent Events and to build on top of it.

Installation

You can install SEEparser from CRAN like so:

install.packages("SSEparser")

Alternatively, you can install the development version like so:

pak::pak("calderonsamuel/SSEparser")

Example

The parse_sse() function takes a string containing a server-sent event and converts it to a R list.

library(SSEparser)

event <- "data: test\nevent: message\nid: 123\n\n"

parse_sse(event)
#> [[1]]
#> [[1]]$data
#> [1] "test"
#> 
#> [[1]]$event
#> [1] "message"
#> 
#> [[1]]$id
#> [1] "123"

Comments are usually received in a line starting with a colon. They are not parsed.

with_comment <- "data: test\n: comment\nevent: example\n\n"

parse_sse(with_comment)
#> [[1]]
#> [[1]]$data
#> [1] "test"
#> 
#> [[1]]$event
#> [1] "example"

Use in HTTP requests

parse_sse() wraps the SSEparser R6 class, which is also exported to be used with real-time streaming data. The following code handles a request with MIME type “text/event-stream”.

parser <- SSEparser$new()
response <- httr2::request("https://postman-echo.com/server-events/3") %>%
    httr2::req_body_json(data = list(
        event = "message",
        request = "POST"
    )) %>%
    httr2::req_perform_stream(callback = \(x) {
        event <- rawToChar(x)
        parser$parse_sse(event)
        TRUE
    })

str(parser$events)
#> List of 3
#>  $ :List of 3
#>   ..$ event: chr "error"
#>   ..$ data : chr "{\"event\":\"message\",\"request\":\"POST\"}"
#>   ..$ id   : chr "1"
#>  $ :List of 3
#>   ..$ event: chr "notification"
#>   ..$ data : chr "{\"event\":\"message\",\"request\":\"POST\"}"
#>   ..$ id   : chr "2"
#>  $ :List of 3
#>   ..$ event: chr "message"
#>   ..$ data : chr "{\"event\":\"message\",\"request\":\"POST\"}"
#>   ..$ id   : chr "3"

Extending SSEparser

Following the previous example, it should be useful to parse the content of every data field to be also an R list instead of a JSON string. For that, we can create a new R6 class which inherits from SSEparser. We just need to overwrite the append_parsed_sse() method.

CustomParser <- R6::R6Class(
    classname = "CustomParser",
    inherit = SSEparser,
    public = list(
        initialize = function() {
            super$initialize()
        },
        append_parsed_sse = function(parsed_event) {
            parsed_event$data <- jsonlite::fromJSON(parsed_event$data)
            self$events = c(self$events, list(parsed_event))
            invisible(self)
        }
    )
)

Notice that the only thing we are modifying is the parsing of the data field, not the parsing of the event itself. This is the original method from SSEparser:

SSEparser$public_methods$append_parsed_sse
#> function (parsed_event) 
#> {
#>     self$events <- c(self$events, list(parsed_event))
#>     invisible(self)
#> }
#> <bytecode: 0x00000147d67ed9b8>
#> <environment: namespace:SSEparser>

CustomParser uses jsonlite::fromJSON() to parse the data field of every chunk in the event stream. We can now use our custom class with the previous request1.

parser <- CustomParser$new()
response <- httr2::request("https://postman-echo.com/server-events/3") %>%
    httr2::req_body_json(data = list(
        event = "message",
        request = "POST"
    )) %>%
    httr2::req_perform_stream(callback = \(x) {
        event <- rawToChar(x)
        parser$parse_sse(event)
        TRUE
    })

str(parser$events)
#> List of 3
#>  $ :List of 3
#>   ..$ event: chr "ping"
#>   ..$ data :List of 2
#>   .. ..$ event  : chr "message"
#>   .. ..$ request: chr "POST"
#>   ..$ id   : chr "1"
#>  $ :List of 3
#>   ..$ event: chr "message"
#>   ..$ data :List of 2
#>   .. ..$ event  : chr "message"
#>   .. ..$ request: chr "POST"
#>   ..$ id   : chr "2"
#>  $ :List of 3
#>   ..$ event: chr "info"
#>   ..$ data :List of 2
#>   .. ..$ event  : chr "message"
#>   .. ..$ request: chr "POST"
#>   ..$ id   : chr "3"

Now instead of a JSON string we can have an R list in the data field while the stream is still in process.