Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
b11823c
Wrote server/client interpreters
Baccata Feb 21, 2023
8b5795f
Example compiles
Baccata Feb 21, 2023
c18a814
Fixed un-bound flatMap, tried a few other things
Baccata Feb 21, 2023
619eb63
Merge remote-tracking branch 'origin/main' into smithy4s-integration
Baccata Feb 21, 2023
cce5d1d
Fix comment
Baccata Feb 21, 2023
0c3251b
Revert changes to FS2Channel
Baccata Feb 21, 2023
2cfb589
Bump remaining versions
Baccata Feb 21, 2023
4b8c0db
Merge branch 'main' into smithy4s-integration
ghostbuster91 May 3, 2025
6f93323
Merge branch 'main' into smithy4s-integration
ghostbuster91 May 4, 2025
53cee0a
Upgrade to latest smithy4s
ghostbuster91 May 4, 2025
fe670de
Fix scala native part of the build
ghostbuster91 May 5, 2025
799e818
Fix duplicated resources error
ghostbuster91 May 5, 2025
9523f61
Cross publish smithy across all platforms
ghostbuster91 May 5, 2025
67c49a5
Add versionScheme
ghostbuster91 May 5, 2025
703e25a
Convert for-comp to flatMap
ghostbuster91 May 5, 2025
0e7dd22
Add method on FS2Channel: resource (#80)
ghostbuster91 May 6, 2025
7d1323c
Fix smithy protocol wiring
kubukoz May 6, 2025
c3ce708
Merge pull request #81 from neandertech/fix-smithy-setup
ghostbuster91 May 6, 2025
6269973
Generate java parts for smithy traits
ghostbuster91 May 6, 2025
8aea9e6
Set minJdkVersion to 11
ghostbuster91 May 6, 2025
e45acf7
Remove unused file
ghostbuster91 May 6, 2025
97462b6
Simplify smithy4s implementation, get rid of fs2 in it
kubukoz May 7, 2025
70060bd
Merge branch 'smithy4s-integration' into smithy-simplify
kubukoz May 7, 2025
cb40262
bump smithy4s
kubukoz May 7, 2025
1ca96ee
Merge pull request #82 from neandertech/smithy-simplify
ghostbuster91 May 7, 2025
be15b68
Remove half-baked FutureBaseChannel
ghostbuster91 May 8, 2025
2ba2bab
Revert "Remove half-baked FutureBaseChannel"
ghostbuster91 May 8, 2025
5359542
feat: Replace jsoniter macros with circe (#83)
ghostbuster91 May 13, 2025
e46c197
Replace ChildProcess with fs2.Process
ghostbuster91 May 13, 2025
6c4b437
Add common smithy traits into protocol definition
ghostbuster91 May 13, 2025
830a6b0
Better way to coerce unit
ghostbuster91 May 14, 2025
c070f65
Errors smithy4s (#86)
ghostbuster91 May 15, 2025
bec1dca
Filter out optionals by default (#87)
kubukoz May 16, 2025
246ba77
Accessing endpoints from multiple servers
ghostbuster91 May 16, 2025
903ab82
chore: sort and group imports with scalafmt
ghostbuster91 May 16, 2025
3821de8
Add some scaladocs
ghostbuster91 May 16, 2025
fdb23b4
internal error when processing notification should not break the server
ghostbuster91 May 16, 2025
9395f05
Add smithy validations (#88)
ghostbuster91 May 17, 2025
a76d283
Fix RawMessage decoder
ghostbuster91 May 26, 2025
c3546cc
feat: Add jsonPayload trait (#89)
ghostbuster91 May 28, 2025
e168783
Rename traits to include Rpc in their names
ghostbuster91 May 29, 2025
074fb87
Update readme
ghostbuster91 May 29, 2025
8eee63e
Fix missing renames
ghostbuster91 May 29, 2025
381b083
Update README.md
ghostbuster91 Jun 7, 2025
55e611e
Update project/build.sbt
ghostbuster91 Jun 7, 2025
6f2f70c
Update project/plugins.sbt
ghostbuster91 Jun 7, 2025
1bc15ae
Apply review comments
ghostbuster91 Jun 7, 2025
9734e14
Replace target with release
ghostbuster91 Jun 7, 2025
23b3eb7
Create caches for document encoders (#91)
ghostbuster91 Jun 14, 2025
9cbe713
Fail when service does support jsonRPC protocol
ghostbuster91 Jun 15, 2025
b63bcfe
Mark extension classes as final
ghostbuster91 Jun 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .mill-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.10.10
0.10.11
70 changes: 65 additions & 5 deletions build.sc
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import mill.define.Sources
import mill.define.Target
import mill.util.Jvm
import $ivy.`com.lihaoyi::mill-contrib-bloop:$MILL_VERSION`
import $ivy.`com.disneystreaming.smithy4s::smithy4s-mill-codegen-plugin::0.17.4`
import $ivy.`io.github.davidgregory084::mill-tpolecat::0.3.2`
import $ivy.`io.chris-kipp::mill-ci-release::0.1.5`

Expand All @@ -13,16 +15,18 @@ import scalanativelib._
import mill.scalajslib.api._
import io.github.davidgregory084._
import io.kipp.mill.ci.release.CiReleaseModule
import _root_.smithy4s.codegen.mill._

object versions {
val scala212Version = "2.12.16"
val scala213Version = "2.13.10"
val scala3Version = "3.1.2"
val scalaJSVersion = "1.10.1"
val scalaNativeVersion = "0.4.8"
val scala3Version = "3.2.2"
val scalaJSVersion = "1.13.0"
val scalaNativeVersion = "0.4.10"
val munitVersion = "0.7.29"
val munitNativeVersion = "1.0.0-M7"
val fs2 = "3.3.0"
val jsoniterVersion = "2.21.0"
val fs2 = "3.6.1"
val weaver = "0.8.0"

val scala213 = "2.13"
Expand All @@ -40,7 +44,7 @@ import versions._
object core extends RPCCrossPlatformModule { cross =>

def crossPlatformIvyDeps: T[Agg[Dep]] = Agg(
ivy"com.github.plokhotnyuk.jsoniter-scala::jsoniter-scala-macros::2.17.0"
ivy"com.github.plokhotnyuk.jsoniter-scala::jsoniter-scala-macros::$jsoniterVersion"
)

object jvm extends mill.Cross[JvmModule](scala213, scala3)
Expand Down Expand Up @@ -83,6 +87,41 @@ object fs2 extends RPCCrossPlatformModule { cross =>

}

object smithy extends JavaModule {}

object smithy4s extends RPCCrossPlatformModule { cross =>

override def crossPlatformModuleDeps = Seq(fs2)
def crossPlatformIvyDeps: T[Agg[Dep]] = Agg(
ivy"com.disneystreaming.smithy4s::smithy4s-json::${_root_.smithy4s.codegen.BuildInfo.version}"
)

// A module holding the code-generation logic to help cache that task
object gen extends Smithy4sModule {
def scalaVersion = "2.13.10"
def smithy4sInternalDependenciesAsJars = T {
List(smithy.jar())
}
}

object jvm extends mill.Cross[JvmModule](scala213, scala3)
def sharedSmithy = T.sources(T.workspace / "smithy" / "resources" / "META-INF" / "smithy")
class JvmModule(cv: String) extends cross.JVM(cv) with Smithy4sModule {
def smithy4sInputDirs = sharedSmithy
}

object js extends mill.Cross[JsModule](scala213, scala3)
class JsModule(cv: String) extends cross.JS(cv) with Smithy4sModule {
def smithy4sInputDirs = sharedSmithy
}

object native extends mill.Cross[NativeModule](scala3)
class NativeModule(cv: String) extends cross.Native(cv) with Smithy4sModule {
def smithy4sInputDirs = sharedSmithy
}

}

object examples extends mill.define.Module {

object server extends ScalaModule {
Expand All @@ -101,6 +140,27 @@ object examples extends mill.define.Module {
}
}

object smithyShared extends Smithy4sModule {
def moduleDeps = Seq(smithy4s.jvm(versions.scala213))
def scalaVersion = versions.scala213Version
}

object smithyServer extends ScalaModule {
def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}")
def moduleDeps = Seq(fs2.jvm(versions.scala213), smithyShared)
def scalaVersion = versions.scala213Version
}

object smithyClient extends ScalaModule {
def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}")
def moduleDeps = Seq(fs2.jvm(versions.scala213), smithyShared)
def scalaVersion = versions.scala213Version
def forkEnv: Target[Map[String, String]] = T {
val assembledServer = smithyServer.assembly()
super.forkEnv() ++ Map("SERVER_JAR" -> assembledServer.path.toString())
}
}

}

// #############################################################################
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package examples.smithy.client

import fs2.Stream
import cats.effect._
import cats.syntax.all._
import scala.jdk.CollectionConverters._
import java.io.OutputStream

trait ChildProcess[F[_]] {
def stdin: fs2.Pipe[F, Byte, Unit]
def stdout: Stream[F, Byte]
def stderr: Stream[F, Byte]
}

object ChildProcess {

def spawn[F[_]: Async](command: String*): Stream[F, ChildProcess[F]] =
Stream.resource(startRes(command))

val readBufferSize = 512

private def startRes[F[_]: Async](command: Seq[String]) = Resource
.make {
Async[F].interruptible(new java.lang.ProcessBuilder(command.asJava).start())
} { p =>
Sync[F].interruptible(p.destroy())
}
.map { p =>
val done = Async[F].fromCompletableFuture(Sync[F].delay(p.onExit()))
new ChildProcess[F] {
def stdin: fs2.Pipe[F, Byte, Unit] =
writeOutputStreamFlushingChunks[F](Sync[F].interruptible(p.getOutputStream()))

def stdout: fs2.Stream[F, Byte] = fs2.io
.readInputStream[F](Sync[F].interruptible(p.getInputStream()), chunkSize = readBufferSize)

def stderr: fs2.Stream[F, Byte] = fs2.io
.readInputStream[F](Sync[F].blocking(p.getErrorStream()), chunkSize = readBufferSize)
// Avoids broken pipe - we cut off when the program ends.
// Users can decide what to do with the error logs using the exitCode value
.interruptWhen(done.void.attempt)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readInputStream is not using interruptible looks like.
https://github.com/typelevel/fs2/blob/56b86022d48d3caf0a9b5f00bf637cac62bb7f5e/io/shared/src/main/scala/fs2/io/io.scala#L46

So that would probably explain why it is hanging. We can try swapping that, but as you mentioned interruptibility might be broken like it is for stdin. In which case I suppose each Process would have to start its own dedicated blocking threads that it can kill 🤔

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I published 3.7-ae0fc02-SNAPSHOT which swapped in interruptible, you can give that a try and see if it helps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't resolve yet, are you sure it's the right version ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh, my bad. Will try in a bit

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm I only get the initial prints from both the client and server side, but then nothing ... It may be because chunks are not automatically flushed in your implementation. That's why I couldn't use the Davenverse lib directly :

https://github.com/neandertech/jsonrpclib/pull/58/files#diff-281c0b146d38f8e1fc9f7c77eb87773debe12d364454ef6766472a26563a0ee6R47-R59

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops, I missed that. Thanks!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, try this one 3.7-eacce62-SNAPSHOT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works 🎉 most things that should print actually print (except for the server termination message but that's some userland problem), and the program terminates as expected !

}
}

/** Adds a flush after each chunk
*/
def writeOutputStreamFlushingChunks[F[_]](
fos: F[OutputStream],
closeAfterUse: Boolean = true
)(implicit F: Sync[F]): fs2.Pipe[F, Byte, Nothing] =
s => {
def useOs(os: OutputStream): Stream[F, Nothing] =
s.chunks.foreach(c => F.interruptible(os.write(c.toArray)) >> F.blocking(os.flush()))

val os =
if (closeAfterUse) Stream.bracket(fos)(os => F.blocking(os.close()))
else Stream.eval(fos)
os.flatMap(os => useOs(os) ++ Stream.exec(F.blocking(os.flush())))
}

}
60 changes: 60 additions & 0 deletions examples/smithyClient/src/examples/smithy/client/ClientMain.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package examples.smithy.client

import cats.effect._
import cats.syntax.all._
import fs2.Stream
import fs2.io._
import jsonrpclib.CallId
import jsonrpclib.fs2._
import jsonrpclib.smithy4sinterop.ClientStub
import jsonrpclib.smithy4sinterop.ServerEndpoints
import test._

import java.io.InputStream
import java.io.OutputStream

object SmithyClientMain extends IOApp.Simple {

// Reserving a method for cancelation.
val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity)

type IOStream[A] = fs2.Stream[IO, A]
def log(str: String): IOStream[Unit] = Stream.eval(IO.consoleForIO.errorln(str))

// Implementing the generated interface
object Client extends TestClient[IO] {
def pong(pong: String): IO[Unit] = IO.consoleForIO.errorln(s"Client received pong: $pong")
}

def run: IO[Unit] = {
import scala.concurrent.duration._
val run = for {
////////////////////////////////////////////////////////
/////// BOOTSTRAPPING
////////////////////////////////////////////////////////
_ <- log("Starting client")
serverJar <- sys.env.get("SERVER_JAR").liftTo[IOStream](new Exception("SERVER_JAR env var does not exist"))
// Starting the server
rp <- ChildProcess.spawn[IO]("java", "-jar", serverJar)
// Creating a channel that will be used to communicate to the server
fs2Channel <- FS2Channel[IO](cancelTemplate = cancelEndpoint.some)
// Mounting our implementation of the generated interface onto the channel
_ <- fs2Channel.withEndpointsStream(ServerEndpoints(Client))
// Creating stubs to talk to the remote server
server: TestServer[IO] <- ClientStub.stream(test.TestServer, fs2Channel)
_ <- Stream(())
.concurrently(fs2Channel.output.through(lsp.encodeMessages).through(rp.stdin))
.concurrently(rp.stdout.through(lsp.decodeMessages).through(fs2Channel.inputOrBounce))
.concurrently(rp.stderr.through(fs2.io.stderr[IO]))

////////////////////////////////////////////////////////
/////// INTERACTION
////////////////////////////////////////////////////////
result1 <- Stream.eval(server.greet("Client"))
_ <- log(s"Client received $result1")
_ <- Stream.eval(server.ping("Ping"))
} yield ()
run.compile.drain.guarantee(IO.consoleForIO.errorln("Terminating client"))
}

}
42 changes: 42 additions & 0 deletions examples/smithyServer/src/examples/smithy/server/ServerMain.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package examples.smithy.server

import jsonrpclib.CallId
import jsonrpclib.fs2._
import cats.effect._
import fs2.io._
import jsonrpclib.Endpoint
import cats.syntax.all._
import test._ // smithy4s-generated package
import jsonrpclib.smithy4sinterop.ClientStub
import jsonrpclib.smithy4sinterop.ServerEndpoints

object ServerMain extends IOApp.Simple {

// Reserving a method for cancelation.
val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity)

// Implementing the generated interface
class ServerImpl(client: TestClient[IO]) extends TestServer[IO] {
def greet(name: String): IO[GreetOutput] = IO.pure(GreetOutput(s"Server says: hello $name !"))

def ping(ping: String): IO[Unit] = client.pong(s"Returned to sender: $ping")
}

def printErr(s: String): IO[Unit] = IO.consoleForIO.errorln(s)

def run: IO[Unit] = {
val run = for {
channel <- FS2Channel[IO](cancelTemplate = Some(cancelEndpoint))
testClient <- ClientStub.stream(TestClient, channel)
_ <- channel.withEndpointsStream(ServerEndpoints(new ServerImpl(testClient)))
_ <- fs2.Stream
.eval(IO.never) // running the server forever
.concurrently(stdin[IO](512).through(lsp.decodeMessages).through(channel.inputOrBounce))
.concurrently(channel.output.through(lsp.encodeMessages).through(stdout[IO]))
} yield {}

// Using errorln as stdout is used by the RPC channel
printErr("Starting server") >> run.compile.drain.guarantee(printErr("Terminating server"))
}

}
45 changes: 45 additions & 0 deletions examples/smithyShared/smithy/spec.smithy
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
$version: "2.0"

namespace test

use jsonrpclib#jsonRequest
use jsonrpclib#jsonRPC
use jsonrpclib#jsonNotification

@jsonRPC
service TestServer {
operations: [Greet, Ping]
}

@jsonRPC
service TestClient {
operations: [Pong]
}

@jsonRequest("greet")
operation Greet {
input := {
@required
name: String
}
output := {
@required
message: String
}
}

@jsonNotification("ping")
operation Ping {
input := {
@required
ping: String
}
}

@jsonNotification("pong")
operation Pong {
input := {
@required
pong: String
}
}
23 changes: 23 additions & 0 deletions smithy/resources/META-INF/smithy/jsonrpclib.smithy
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
$version: "2.0"

namespace jsonrpclib
Copy link
Contributor Author

@Baccata Baccata Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this smithy file contains the protocol definition. It kind of echoes a question that @ckipp01 had on my BSP PR about where would be a good "central" for protocols.

I suppose DisneyStreaming could host it in https://github.com/disneystreaming/alloy/, but I'm a bit wary about putting it there and have the BSP depend on it ($work project etc etc).


/// the JSON-RPC protocol,
/// see https://www.jsonrpc.org/specification
@protocolDefinition(traits: [
jsonRequest
jsonNotification
])
@trait(selector: "service")
structure jsonRPC {
}

/// Identifies an operation that abides by request/response semantics
/// https://www.jsonrpc.org/specification#request_object
@trait(selector: "operation")
string jsonRequest

/// Identifies an operation that abides by fire-and-forget semantics
/// see https://www.jsonrpc.org/specification#notification
@trait(selector: "operation")
string jsonNotification
1 change: 1 addition & 0 deletions smithy/resources/META-INF/smithy/manifest
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
jsonrpclib.smithy
Loading