R/Go IPC with Nanomsg Next Gen.
Beside being the way mangos is said in bambara (derived from
portugese as it happens), in this package we vendor the mangos/v3 and arrow-go Go packages for
IPC between R and Go processes using the nanonext and
nanoarrow R packages on the R side. The package provides
helper functions to build Go binaries that use mangos and Arrow for IPC.
This is a basic setup that can be used as a starting point for more
complex R/Go IPC applications. In our opinion, this approach avoids the
complexities and limitations of cgo’s c-shared mode, which can lead to
issues with loading multiple Go runtimes in the same R session as
discussed in this R-package-devel mailing list thread: CRAN
Policy on Go using Packages.
# via r-universe
install.packages('mangoro', repos = c('https://sounkou-bioinfo.r-universe.dev', 'https://cloud.r-project.org'))
# For CRAN release (not available yet)
install.packages('mangoro')Compile some go code on-the-fly from R using the
mangoro_go_build() function. This uses the vendored go code
in inst/go/vendor
library(nanonext)
library(processx)
library(nanoarrow)
library(mangoro)
# vendored mangos version
get_mangos_version()
#> [1] "v3.4.3-0.20250905144305-2c434adf4860"
go_echo_code <- paste(
'package main',
'import (',
' "os"',
' "go.nanomsg.org/mangos/v3/protocol/rep"',
' _ "go.nanomsg.org/mangos/v3/transport/ipc"',
')',
'func main() {',
' url := os.Args[1]',
' sock, _ := rep.NewSocket()',
' sock.Listen(url)',
' for {',
' msg, _ := sock.Recv()',
' newMsg := append(msg, []byte(" [echoed by Go]")...)',
' sock.Send(newMsg)',
' }',
'}',
sep = "\n"
)
tmp_go <- tempfile(fileext = ".go")
writeLines(go_echo_code, tmp_go)
tmp_bin <- tempfile()
mangoro_go_build(tmp_go, tmp_bin)
#> [1] "GOMAXPROCS=1 /usr/lib/go-1.22/bin/go 'build' '-mod=vendor' '-o' '/tmp/RtmpHIQD9Z/file1b015e24cbd2cd' '/tmp/RtmpHIQD9Z/file1b015e6fb6f0c9.go'"
#> [1] "/tmp/RtmpHIQD9Z/file1b015e24cbd2cd"create IPC path and send/receive message
ipc_url <- create_ipc_path()
echo_proc <- processx::process$new(tmp_bin, args = ipc_url)
Sys.sleep(1)
echo_proc$is_alive()
#> [1] TRUE
sock <- nanonext::socket("req", dial = ipc_url)
msg <- charToRaw("hello from R")
max_attempts <- 20
send_result <- nanonext::send(sock, msg, mode = "raw")
attempt <- 1
while (nanonext::is_error_value(send_result) && attempt < max_attempts) {
Sys.sleep(1)
send_result <- nanonext::send(sock, msg, mode = "raw")
attempt <- attempt + 1
}
response <- nanonext::recv(sock, mode = "raw")
attempt <- 1
while (nanonext::is_error_value(response) && attempt < max_attempts) {
Sys.sleep(1)
response <- nanonext::recv(sock, mode = "raw")
attempt <- attempt + 1
}
rawToChar(response)
#> [1] "hello from R [echoed by Go]"
close(sock)
echo_proc$kill()
#> [1] TRUECompile go code this time that uses Arrow IPC for (de)serialization between R and Go.
cfg <- nanonext::serial_config(
"ArrowTabular",
nanoarrow::write_nanoarrow,
nanoarrow::read_nanoarrow
)
ipc_url <- create_ipc_path()
go_code <- '
package main
import (
"os"
"bytes"
"fmt"
"go.nanomsg.org/mangos/v3/protocol/rep"
_ "go.nanomsg.org/mangos/v3/transport/ipc"
"github.com/apache/arrow/go/v18/arrow/ipc"
"github.com/apache/arrow/go/v18/arrow/memory"
)
func main() {
url := os.Args[1]
sock, _ := rep.NewSocket()
sock.Listen(url)
for {
msg, _ := sock.Recv()
reader, err := ipc.NewReader(bytes.NewReader(msg), ipc.WithAllocator(memory.DefaultAllocator))
if err != nil {
fmt.Println("Arrow IPC error:", err)
continue
}
var buf bytes.Buffer
writer := ipc.NewWriter(&buf, ipc.WithSchema(reader.Schema()))
for reader.Next() {
rec := reader.Record()
fmt.Println(rec)
if err := writer.Write(rec); err != nil {
fmt.Println("Arrow IPC write error:", err)
}
rec.Release()
}
if err := writer.Close(); err != nil {
fmt.Println("Arrow IPC writer close error:", err)
}
reader.Release()
sock.Send(buf.Bytes())
}
}
'
tmp_go <- tempfile(fileext = ".go")
writeLines(go_code, tmp_go)
tmp_bin <- tempfile()
mangoro_go_build(tmp_go, tmp_bin)
#> [1] "GOMAXPROCS=1 /usr/lib/go-1.22/bin/go 'build' '-mod=vendor' '-o' '/tmp/RtmpHIQD9Z/file1b015e6dbf9c59' '/tmp/RtmpHIQD9Z/file1b015e47fbb389.go'"
#> [1] "/tmp/RtmpHIQD9Z/file1b015e6dbf9c59"
echo_proc <- processx::process$new(tmp_bin, args = ipc_url, stdout = "|", stderr = "|" )
Sys.sleep(3)Configure the socket and send/receive an Arrow IPC data. Note that we use a loop with retries to handle potential timing issues when the Go echo server is not yet ready to receive messages.
echo_proc$is_alive()
#> [1] TRUE
sock <- nanonext::socket("req", dial = ipc_url)
nanonext::opt(sock, "serial") <- cfg
example_stream <- nanoarrow::example_ipc_stream()
max_attempts <- 20
send_result <- nanonext::send(sock, example_stream, mode = "raw")
attempt <- 1
while (nanonext::is_error_value(send_result) && attempt < max_attempts) {
Sys.sleep(1)
send_result <- nanonext::send(sock, example_stream, mode = "raw")
attempt <- attempt + 1
}
send_result
#> [1] 0
echo_proc$is_alive()
#> [1] TRUE
Sys.sleep(1)
received <- nanonext::recv(sock, mode = "serial")
#> Warning: received data could not be unserialized
attempt <- 1
while (nanonext::is_error_value(received) && attempt < max_attempts) {
Sys.sleep(1)
received <- nanonext::recv(sock, mode = "serial")
attempt <- attempt + 1
}
sent_df <- as.data.frame(read_nanoarrow(example_stream))
received_df <- as.data.frame(read_nanoarrow(received))
sent_df
#> some_col
#> 1 0
#> 2 1
#> 3 2
received_df
#> some_col
#> 1 0
#> 2 1
#> 3 2
identical(sent_df, received_df)
#> [1] TRUE
close(sock)
echo_proc$kill()
#> [1] TRUEThe package includes rgoipc, a Go package for building
RPC servers with function registration. Functions are registered in the
Go main application and called by R.
The RPC protocol wraps Arrow IPC data in a simple envelope:
[type:1byte][name_len:4bytes][name][error_len:4bytes][error][arrow_ipc_data]
Both the input and output data are serialized using Arrow IPC format.
The Go server receives Arrow IPC (as arrow.Record),
processes it, and returns Arrow IPC. On the R side, you can work with
data frames, nanoarrow streams, or any Arrow-compatible structure. The
thin RPC envelope only adds metadata (function name, error handling)
around the Arrow data.
Key concept: R data.frames naturally map to Arrow
RecordBatch (tabular data with multiple columns). Each column in a
data.frame becomes an Arrow Array. Functions receive and return
arrow.Record objects, which represent this tabular
structure.
rpc_server_path <- file.path(system.file("go", package = "mangoro"), "cmd", "rpc-example", "main.go")
rpc_bin <- tempfile()
mangoro_go_build(rpc_server_path, rpc_bin)
#> [1] "GOMAXPROCS=1 /usr/lib/go-1.22/bin/go 'build' '-mod=vendor' '-o' '/tmp/RtmpHIQD9Z/file1b015e51db764f' '/usr/local/lib/R/site-library/mangoro/go/cmd/rpc-example/main.go'"
#> [1] "/tmp/RtmpHIQD9Z/file1b015e51db764f"
ipc_url <- create_ipc_path()
rpc_proc <- processx::process$new(rpc_bin, args = ipc_url, stdout = "|", stderr = "|")
Sys.sleep(2)
rpc_proc$is_alive()
#> [1] TRUERequest the manifest of registered functions:
sock <- nanonext::socket("req", dial = ipc_url)
manifest <- mangoro_rpc_get_manifest(sock)
manifest
#> $add
#> $add$Args
#> Name Type.Type Type.Nullable Type.StructDef Type.ListSchema Optional Default
#> 1 x float64 TRUE NA NA FALSE NA
#> 2 y float64 TRUE NA NA FALSE NA
#>
#> $add$ReturnType
#> $add$ReturnType$Type
#> [1] "float64"
#>
#> $add$ReturnType$Nullable
#> [1] TRUE
#>
#> $add$ReturnType$StructDef
#> NULL
#>
#> $add$ReturnType$ListSchema
#> NULL
#>
#>
#> $add$Vectorized
#> [1] TRUE
#>
#> $add$Metadata
#> $add$Metadata$description
#> [1] "Add two numeric vectors"
#>
#>
#>
#> $echoString
#> $echoString$Args
#> Name Type.Type Type.Nullable Type.StructDef Type.ListSchema Optional Default
#> 1 s string TRUE NA NA FALSE NA
#>
#> $echoString$ReturnType
#> $echoString$ReturnType$Type
#> [1] "string"
#>
#> $echoString$ReturnType$Nullable
#> [1] TRUE
#>
#> $echoString$ReturnType$StructDef
#> NULL
#>
#> $echoString$ReturnType$ListSchema
#> NULL
#>
#>
#> $echoString$Vectorized
#> [1] TRUE
#>
#> $echoString$Metadata
#> $echoString$Metadata$description
#> [1] "Echo back a string vector"
#>
#>
#>
#> $echoStruct
#> $echoStruct$Args
#> Name Type.Type Type.Nullable
#> 1 person struct FALSE
#> Type.Fields
#> 1 name, age, string, int32, FALSE, FALSE, NA, NA, NA, NA, NA, NA
#> Type.ListSchema Optional Default
#> 1 NA FALSE NA
#>
#> $echoStruct$ReturnType
#> $echoStruct$ReturnType$Type
#> [1] "struct"
#>
#> $echoStruct$ReturnType$Nullable
#> [1] FALSE
#>
#> $echoStruct$ReturnType$StructDef
#> $echoStruct$ReturnType$StructDef$Fields
#> Name Type.Type Type.Nullable Type.StructDef Type.ListSchema Metadata
#> 1 name string FALSE NA NA NA
#> 2 age int32 FALSE NA NA NA
#>
#>
#> $echoStruct$ReturnType$ListSchema
#> NULL
#>
#>
#> $echoStruct$Vectorized
#> [1] TRUE
#>
#> $echoStruct$Metadata
#> $echoStruct$Metadata$description
#> [1] "Echo back a struct column (nested data)"
#>
#>
#>
#> $transposeMatrix
#> $transposeMatrix$Args
#> list()
#>
#> $transposeMatrix$ReturnType
#> $transposeMatrix$ReturnType$Type
#> [1] "struct"
#>
#> $transposeMatrix$ReturnType$Nullable
#> [1] FALSE
#>
#> $transposeMatrix$ReturnType$StructDef
#> $transposeMatrix$ReturnType$StructDef$Fields
#> list()
#>
#>
#> $transposeMatrix$ReturnType$ListSchema
#> NULL
#>
#>
#> $transposeMatrix$Vectorized
#> [1] FALSE
#>
#> $transposeMatrix$Metadata
#> $transposeMatrix$Metadata$description
#> [1] "Transpose a matrix (columns <-> rows)"
close(sock)Call the add function with Arrow IPC data:
sock <- nanonext::socket("req", dial = ipc_url)
input_df <- data.frame(x = c(1.5, 2.5, 3.5, NA), y = c(0.5, 1.5, 2.5, 4.5))
result <- mangoro_rpc_call(sock, "add", input_df)
result_df <- as.data.frame(result)
input_df
#> x y
#> 1 1.5 0.5
#> 2 2.5 1.5
#> 3 3.5 2.5
#> 4 NA 4.5
result_df
#> result
#> 1 2
#> 2 4
#> 3 6
#> 4 NA
input_df$x + input_df$y
#> [1] 2 4 6 NA
close(sock)Call the echoString function to test string
handling:
sock <- nanonext::socket("req", dial = ipc_url)
input_df <- data.frame(s = c("hello", "world", NA, "mangoro"))
result <- mangoro_rpc_call(sock, "echoString", input_df)
result_df <- as.data.frame(result)
input_df
#> s
#> 1 hello
#> 2 world
#> 3 <NA>
#> 4 mangoro
result_df
#> result
#> 1 hello
#> 2 world
#> 3 <NA>
#> 4 mangoro
identical(input_df$s, result_df$result)
#> [1] TRUE
close(sock)Call the transposeMatrix function to demonstrate matrix
handling. In R, we send a matrix as a data.frame (each column is a
column), and Go transposes it:
sock <- nanonext::socket("req", dial = ipc_url)
# Create a 3x4 matrix and convert to data.frame for Arrow IPC
mat <- matrix(c(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12), nrow = 3, ncol = 4)
"Input matrix (3x4):"
#> [1] "Input matrix (3x4):"
mat
#> [,1] [,2] [,3] [,4]
#> [1,] 1 4 7 10
#> [2,] 2 5 8 11
#> [3,] 3 6 9 12
# Convert to data.frame - each column becomes a column
input_df <- as.data.frame(mat)
colnames(input_df) <- paste0("V", 1:ncol(mat))
# Transpose via RPC
result <- mangoro_rpc_call(sock, "transposeMatrix", input_df)
result_df <- as.data.frame(result)
# Compare values (ignore dimnames)
all.equal(as.matrix(result_df), t(mat), check.attributes = FALSE)
#> [1] TRUE
close(sock)Call the echoStruct function to demonstrate
nested/struct column handling
sock <- nanonext::socket("req", dial = ipc_url)
# Create a struct column: a single column named 'person'
# where each element is a struct with 'name' and 'age' fields
person_df <- data.frame(
name = c("Alice", "Bob", "Charlie"),
age = c(30L, 25L, 35L),
stringsAsFactors = FALSE
)
# Wrap in a data frame with I() to create a nested structure
# I() preserves person_df as a single nested column (Arrow struct type)
# Without I(), R would flatten it into person.name and person.age columns
input_df <- data.frame(person = I(person_df))
# Call the RPC function
result <- mangoro_rpc_call(sock, "echoStruct", input_df)
result_df <- as.data.frame(result)
str(input_df)
#> 'data.frame': 3 obs. of 1 variable:
#> $ person:Classes 'AsIs' and 'data.frame': 3 obs. of 2 variables:
#> ..$ name: chr "Alice" "Bob" "Charlie"
#> ..$ age : int 30 25 35
str(result_df)
#> 'data.frame': 3 obs. of 1 variable:
#> $ person:'data.frame': 3 obs. of 2 variables:
#> ..$ name: chr "Alice" "Bob" "Charlie"
#> ..$ age : int 30 25 35
result_df
#> person.name person.age
#> 1 Alice 30
#> 2 Bob 25
#> 3 Charlie 35
# Nested data roundtrip successful (AsIs class not preserved)
all.equal(input_df$person, result_df$person, check.attributes = FALSE)
#> [1] TRUE
close(sock)
rpc_proc$kill()
#> [1] TRUEThe package includes an HTTP file server that can be controlled via RPC, demonstrating a slighly more complex use case.
# Build the HTTP server controller
http_server_path <- file.path(system.file("go", package = "mangoro"), "cmd", "http-server", "main.go")
http_bin <- tempfile()
mangoro_go_build(http_server_path, http_bin, gomaxprocs = 4)
#> [1] "GOMAXPROCS=4 /usr/lib/go-1.22/bin/go 'build' '-mod=vendor' '-o' '/tmp/RtmpHIQD9Z/file1b015e653805ea' '/usr/local/lib/R/site-library/mangoro/go/cmd/http-server/main.go'"
#> [1] "/tmp/RtmpHIQD9Z/file1b015e653805ea"
# Start the RPC controller (not the HTTP server itself yet)
ipc_url <- create_ipc_path()
http_ctl_proc <- processx::process$new(http_bin, args = ipc_url, stdout = "|", stderr = "|")
Sys.sleep(2)
http_ctl_proc$is_alive()
#> [1] TRUE
http_ctl_proc$read_output_lines()
#> [1] "Registered functions: [startServer stopServer serverStatus]"
#> [2] "HTTP server controller listening on ipc:///tmp/RtmpHIQD9Z/mangoro-echo1b015e3dd417b2.ipc"Control the HTTP server via RPC:
sock <- nanonext::socket("req", dial = ipc_url)
# Get server status (should be stopped initially)
status <- mangoro_http_status(sock)
status
#> status message
#> 1 status stopped
# Start HTTP server on port 8080 serving current directory
result <- mangoro_http_start(sock, "127.0.0.1:8080", dir = ".", cors = TRUE)
result
#> status message
#> 1 ok HTTP server started on 127.0.0.1:8080
# Check status again
status <- mangoro_http_status(sock)
status
#> status message
#> 1 status running at 127.0.0.1:8080
# Test accessing the HTTP server
readLines("http://127.0.0.1:8080/", n = 3, warn = FALSE)
#> [1] "<pre>"
#> [2] "<a href=\"..Rcheck/\">..Rcheck/</a>"
#> [3] "<a href=\".Rbuildignore\">.Rbuildignore</a>"
# Stop the server
result <- mangoro_http_stop(sock)
result
#> status message
#> 1 ok HTTP server stopped
# Verify it stopped
status <- mangoro_http_status(sock)
status
#> status message
#> 1 status stopped
# Verify it stopped
status <- mangoro_http_status(sock)
status
#> status message
#> 1 status stoppedThe HTTP server supports HTTPS with TLS certificates. Start an HTTPS server by providing certificate and key files:
# Start HTTPS server with certificates (reusing the existing server controller)
result <- mangoro_http_start(
sock,
addr = "127.0.0.1:8443",
dir = ".",
tls = TRUE,
cert = ".certs_test/cert.pem" |> normalizePath(),
key = ".certs_test/key.pem" |> normalizePath(),
cors = TRUE
)
result
#> status message
#> 1 ok HTTP server started on 127.0.0.1:8443
# Check status
status <- mangoro_http_status(sock)
status
#> status message
#> 1 status running at 127.0.0.1:8443
# Download a file from HTTPS server using R's download.file
temp_file <- tempfile()
download.file("https://127.0.0.1:8443/.Rbuildignore",
destfile = temp_file,
method = "curl",
extra = "-k",
quiet = TRUE)
# Read the downloaded file
readLines(temp_file, n = 3)
#> [1] "LICENSE" "^\\.certs_test/.*$" "^misc/.*$"
# Stop the HTTPS server
result <- mangoro_http_stop(sock)
result
#> status message
#> 1 ok HTTP server stopped
http_ctl_proc$read_output_lines()
#> [1] "[mangoro server] 2025/11/17 19:13:25 Starting HTTP server on 127.0.0.1:8080 serving /root/mangoro at /"
#> [2] "[mangoro server] 2025/11/17 19:13:27 GET / 127.0.0.1:34570 66.696µs"
#> [3] "[mangoro server] 2025/11/17 19:13:27 HTTP server stopped"
#> [4] "[mangoro server] 2025/11/17 19:13:30 Starting HTTPS server on 127.0.0.1:8443 serving /root/mangoro at /"
#> [5] "[mangoro server] 2025/11/17 19:13:32 GET /.Rbuildignore 127.0.0.1:59298 1.586772ms"
#> [6] "[mangoro server] 2025/11/17 19:13:32 HTTP server stopped"
close(sock)
http_ctl_proc$kill()
#> [1] TRUEThe rgoipc package provides the interfaces for type-safe
function registration with Arrow schema validation. See inst/go/pkg/rgoipc for the Go package
implementation and inst/go/cmd/rpc-example and inst/go/cmd/http-server for complete
server examples.
There is some conversion overhead now when sending data to go
processes because we are sending the arrow data as bytes after
convertion of the R objects using nanoarrow. Moroever for some reason,
we cannot send directly matrices since we have some cryptic
arrow package requirement.
Code and documentation in this project have been generated with the assistance of the github Copilot LLM tools. While we have reviewed and edited the generated content, we acknowledge that LLM tools were used in the creation process and accordingly (since these models are trained on GPL code and other commons + proprietary software license is fake anyway) the code is released under GPL-3. So if you use this code in any way, you must comply with the GPL-3 license.