-
Notifications
You must be signed in to change notification settings - Fork 6
Smithy4s integration #58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
b11823c
8b5795f
c18a814
619eb63
cce5d1d
0c3251b
2cfb589
4b8c0db
6f93323
53cee0a
fe670de
799e818
9523f61
67c49a5
703e25a
0e7dd22
7d1323c
c3ce708
6269973
8aea9e6
e45acf7
97462b6
70060bd
cb40262
1ca96ee
be15b68
2ba2bab
5359542
e46c197
6c4b437
830a6b0
c070f65
bec1dca
246ba77
903ab82
3821de8
fdb23b4
9395f05
a76d283
c3546cc
e168783
074fb87
8eee63e
381b083
55e611e
6f2f70c
1bc15ae
9734e14
23b3eb7
9cbe713
b63bcfe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
0.10.10 | ||
0.10.11 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package examples.smithy.client | ||
|
||
import fs2.Stream | ||
import cats.effect._ | ||
import cats.syntax.all._ | ||
import scala.jdk.CollectionConverters._ | ||
import java.io.OutputStream | ||
|
||
trait ChildProcess[F[_]] { | ||
def stdin: fs2.Pipe[F, Byte, Unit] | ||
def stdout: Stream[F, Byte] | ||
def stderr: Stream[F, Byte] | ||
} | ||
|
||
object ChildProcess { | ||
|
||
def spawn[F[_]: Async](command: String*): Stream[F, ChildProcess[F]] = | ||
Stream.resource(startRes(command)) | ||
|
||
val readBufferSize = 512 | ||
|
||
private def startRes[F[_]: Async](command: Seq[String]) = Resource | ||
.make { | ||
Async[F].interruptible(new java.lang.ProcessBuilder(command.asJava).start()) | ||
} { p => | ||
Sync[F].interruptible(p.destroy()) | ||
} | ||
.map { p => | ||
val done = Async[F].fromCompletableFuture(Sync[F].delay(p.onExit())) | ||
new ChildProcess[F] { | ||
def stdin: fs2.Pipe[F, Byte, Unit] = | ||
writeOutputStreamFlushingChunks[F](Sync[F].interruptible(p.getOutputStream())) | ||
|
||
def stdout: fs2.Stream[F, Byte] = fs2.io | ||
.readInputStream[F](Sync[F].interruptible(p.getInputStream()), chunkSize = readBufferSize) | ||
|
||
def stderr: fs2.Stream[F, Byte] = fs2.io | ||
.readInputStream[F](Sync[F].blocking(p.getErrorStream()), chunkSize = readBufferSize) | ||
// Avoids broken pipe - we cut off when the program ends. | ||
// Users can decide what to do with the error logs using the exitCode value | ||
.interruptWhen(done.void.attempt) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
So that would probably explain why it is hanging. We can try swapping that, but as you mentioned interruptibility might be broken like it is for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I published There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't resolve yet, are you sure it's the right version ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doh, my bad. Will try in a bit There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mmm I only get the initial prints from both the client and server side, but then nothing ... It may be because chunks are not automatically flushed in your implementation. That's why I couldn't use the Davenverse lib directly : There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Woops, I missed that. Thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, try this one There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It works 🎉 most things that should print actually print (except for the server termination message but that's some userland problem), and the program terminates as expected ! |
||
} | ||
} | ||
|
||
/** Adds a flush after each chunk | ||
*/ | ||
def writeOutputStreamFlushingChunks[F[_]]( | ||
fos: F[OutputStream], | ||
closeAfterUse: Boolean = true | ||
)(implicit F: Sync[F]): fs2.Pipe[F, Byte, Nothing] = | ||
s => { | ||
def useOs(os: OutputStream): Stream[F, Nothing] = | ||
s.chunks.foreach(c => F.interruptible(os.write(c.toArray)) >> F.blocking(os.flush())) | ||
|
||
val os = | ||
if (closeAfterUse) Stream.bracket(fos)(os => F.blocking(os.close())) | ||
else Stream.eval(fos) | ||
os.flatMap(os => useOs(os) ++ Stream.exec(F.blocking(os.flush()))) | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package examples.smithy.client | ||
|
||
import cats.effect._ | ||
import cats.syntax.all._ | ||
import fs2.Stream | ||
import fs2.io._ | ||
import jsonrpclib.CallId | ||
import jsonrpclib.fs2._ | ||
import jsonrpclib.smithy4sinterop.ClientStub | ||
import jsonrpclib.smithy4sinterop.ServerEndpoints | ||
import test._ | ||
|
||
import java.io.InputStream | ||
import java.io.OutputStream | ||
|
||
object SmithyClientMain extends IOApp.Simple { | ||
|
||
// Reserving a method for cancelation. | ||
val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity) | ||
|
||
type IOStream[A] = fs2.Stream[IO, A] | ||
def log(str: String): IOStream[Unit] = Stream.eval(IO.consoleForIO.errorln(str)) | ||
|
||
// Implementing the generated interface | ||
object Client extends TestClient[IO] { | ||
def pong(pong: String): IO[Unit] = IO.consoleForIO.errorln(s"Client received pong: $pong") | ||
} | ||
|
||
def run: IO[Unit] = { | ||
import scala.concurrent.duration._ | ||
val run = for { | ||
//////////////////////////////////////////////////////// | ||
/////// BOOTSTRAPPING | ||
//////////////////////////////////////////////////////// | ||
_ <- log("Starting client") | ||
serverJar <- sys.env.get("SERVER_JAR").liftTo[IOStream](new Exception("SERVER_JAR env var does not exist")) | ||
// Starting the server | ||
rp <- ChildProcess.spawn[IO]("java", "-jar", serverJar) | ||
// Creating a channel that will be used to communicate to the server | ||
fs2Channel <- FS2Channel[IO](cancelTemplate = cancelEndpoint.some) | ||
// Mounting our implementation of the generated interface onto the channel | ||
_ <- fs2Channel.withEndpointsStream(ServerEndpoints(Client)) | ||
// Creating stubs to talk to the remote server | ||
server: TestServer[IO] <- ClientStub.stream(test.TestServer, fs2Channel) | ||
_ <- Stream(()) | ||
.concurrently(fs2Channel.output.through(lsp.encodeMessages).through(rp.stdin)) | ||
.concurrently(rp.stdout.through(lsp.decodeMessages).through(fs2Channel.inputOrBounce)) | ||
.concurrently(rp.stderr.through(fs2.io.stderr[IO])) | ||
|
||
//////////////////////////////////////////////////////// | ||
/////// INTERACTION | ||
//////////////////////////////////////////////////////// | ||
result1 <- Stream.eval(server.greet("Client")) | ||
_ <- log(s"Client received $result1") | ||
_ <- Stream.eval(server.ping("Ping")) | ||
} yield () | ||
run.compile.drain.guarantee(IO.consoleForIO.errorln("Terminating client")) | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package examples.smithy.server | ||
|
||
import jsonrpclib.CallId | ||
import jsonrpclib.fs2._ | ||
import cats.effect._ | ||
import fs2.io._ | ||
import jsonrpclib.Endpoint | ||
import cats.syntax.all._ | ||
import test._ // smithy4s-generated package | ||
import jsonrpclib.smithy4sinterop.ClientStub | ||
import jsonrpclib.smithy4sinterop.ServerEndpoints | ||
|
||
object ServerMain extends IOApp.Simple { | ||
|
||
// Reserving a method for cancelation. | ||
val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity) | ||
|
||
// Implementing the generated interface | ||
class ServerImpl(client: TestClient[IO]) extends TestServer[IO] { | ||
def greet(name: String): IO[GreetOutput] = IO.pure(GreetOutput(s"Server says: hello $name !")) | ||
|
||
def ping(ping: String): IO[Unit] = client.pong(s"Returned to sender: $ping") | ||
} | ||
|
||
def printErr(s: String): IO[Unit] = IO.consoleForIO.errorln(s) | ||
|
||
def run: IO[Unit] = { | ||
val run = for { | ||
channel <- FS2Channel[IO](cancelTemplate = Some(cancelEndpoint)) | ||
testClient <- ClientStub.stream(TestClient, channel) | ||
_ <- channel.withEndpointsStream(ServerEndpoints(new ServerImpl(testClient))) | ||
_ <- fs2.Stream | ||
.eval(IO.never) // running the server forever | ||
.concurrently(stdin[IO](512).through(lsp.decodeMessages).through(channel.inputOrBounce)) | ||
.concurrently(channel.output.through(lsp.encodeMessages).through(stdout[IO])) | ||
} yield {} | ||
|
||
// Using errorln as stdout is used by the RPC channel | ||
printErr("Starting server") >> run.compile.drain.guarantee(printErr("Terminating server")) | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
$version: "2.0" | ||
|
||
namespace test | ||
|
||
use jsonrpclib#jsonRequest | ||
use jsonrpclib#jsonRPC | ||
use jsonrpclib#jsonNotification | ||
|
||
@jsonRPC | ||
service TestServer { | ||
operations: [Greet, Ping] | ||
} | ||
|
||
@jsonRPC | ||
service TestClient { | ||
operations: [Pong] | ||
} | ||
|
||
@jsonRequest("greet") | ||
operation Greet { | ||
input := { | ||
@required | ||
name: String | ||
} | ||
output := { | ||
@required | ||
message: String | ||
} | ||
} | ||
|
||
@jsonNotification("ping") | ||
operation Ping { | ||
input := { | ||
@required | ||
ping: String | ||
} | ||
} | ||
|
||
@jsonNotification("pong") | ||
operation Pong { | ||
input := { | ||
@required | ||
pong: String | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
$version: "2.0" | ||
|
||
namespace jsonrpclib | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this smithy file contains the protocol definition. It kind of echoes a question that @ckipp01 had on my BSP PR about where would be a good "central" for protocols. I suppose DisneyStreaming could host it in https://github.com/disneystreaming/alloy/, but I'm a bit wary about putting it there and have the BSP depend on it ($work project etc etc). |
||
|
||
/// the JSON-RPC protocol, | ||
/// see https://www.jsonrpc.org/specification | ||
@protocolDefinition(traits: [ | ||
jsonRequest | ||
jsonNotification | ||
]) | ||
@trait(selector: "service") | ||
structure jsonRPC { | ||
} | ||
|
||
/// Identifies an operation that abides by request/response semantics | ||
/// https://www.jsonrpc.org/specification#request_object | ||
@trait(selector: "operation") | ||
string jsonRequest | ||
|
||
/// Identifies an operation that abides by fire-and-forget semantics | ||
/// see https://www.jsonrpc.org/specification#notification | ||
@trait(selector: "operation") | ||
string jsonNotification |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
jsonrpclib.smithy |
Uh oh!
There was an error while loading. Please reload this page.