Skip to content

OfxOscReceiver: from detach() to join() #7949

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions addons/ofxOsc/src/ofxOscReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ ofxOscReceiver & ofxOscReceiver::copy(const ofxOscReceiver & other) {
//--------------------------------------------------------------
bool ofxOscReceiver::setup(std::string host, int port) {
if (listenSocket) { // already running
ofLogVerbose("ofxOscReceiver::setup()") << "socket already listening";
stop();
}
settings.port = port;
Expand Down Expand Up @@ -78,7 +79,7 @@ bool ofxOscReceiver::start() {
if (!what.empty() && what.back() == '\n') {
what = what.substr(0, what.size() - 1);
}
ofLogError("ofxOscReceiver") << "couldn't create receiver on port "
ofLogError("ofxOscReceiver::start()") << "couldn't create receiver on port "
<< settings.port << ": " << what;
if (socket != nullptr) {
delete socket;
Expand All @@ -88,25 +89,38 @@ bool ofxOscReceiver::start() {
}

listenThread = std::thread([this] {
while (listenSocket) {
try {
listenSocket->Run();
} catch (std::exception & e) {
ofLogWarning("ofxOscReceiver") << e.what();
}
try {
listenSocket->Run();
} catch (std::exception & e) {
ofLogWarning("ofxOscReceiver::listenSocket->Run()") << e.what();
}
});

// detach thread so we don't have to wait on it before creating a new socket
// or on destruction, the custom deleter for the socket unique_ptr already
// does the right thing

#ifndef OSC_NO_DETACH
listenThread.detach();

#endif
return true;
}

//--------------------------------------------------------------
void ofxOscReceiver::stop() {
#ifdef OSC_NO_DETACH
if (listenSocket) {
listenSocket->AsynchronousBreak();
} else {
ofLogNotice("socket already torn down ");
}

if (!listenThread.joinable()) {
ofLogNotice("not joinable");
} else {
listenThread.join();
}
#endif
listenSocket.reset();
}

Expand Down
1 change: 1 addition & 0 deletions apps/devApps/StressofxOscReceiver/addons.make
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ofxOsc
17 changes: 17 additions & 0 deletions apps/devApps/StressofxOscReceiver/src/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#include "ofMain.h"
#include "ofApp.h"

//========================================================================
int main( ){

//Use ofGLFWWindowSettings for more options like multi-monitor fullscreen
ofGLWindowSettings settings;
settings.setSize(1024, 768);
settings.windowMode = OF_WINDOW; //can also be OF_FULLSCREEN

auto window = ofCreateWindow(settings);

ofRunApp(window, std::make_shared<ofApp>());
ofRunMainLoop();

}
99 changes: 99 additions & 0 deletions apps/devApps/StressofxOscReceiver/src/ofApp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#include "ofApp.h"

#define USE_EXPLICIT_FUNCTION 1

#define INSTANTLY_OUT_OF_SCOPE 1

void ofApp::setup(){
for (size_t i = 0; i < testers_.size(); i++) {
testers_[i] = std::make_shared<Tester>();
}
ofSetVerticalSync(false);

#ifdef INSTANTLY_OUT_OF_SCOPE
ofLogNotice("will hang");
ofxOscReceiver r;
#endif
ofLogNotice("ready to roll");

}

void ofApp::update(){

if (mode_ == STOP) {
if (ofGetFrameNum()%6==0) {
for (const auto & tester: testers_) {
tester->receiver_->stop();
}
} if (ofGetFrameNum()%6==3) {
for (const auto & tester: testers_) {
tester->receiver_->start();
}
}
} else if (mode_ == DYNA) {
if (ofGetFrameNum()%3==0) {
for (const auto & tester: testers_) {
tester->receiver_ = std::make_shared<ofxOscReceiver>(port_);
if (tester->receiver_->isListening()) {
tester->sender_ = std::make_shared<ofxOscSender>("127.0.0.1", port_);
receivers_++;
}
if (port_++ > high_port_) port_ = low_port_;
}
}
}

for (const auto & tester: testers_) {
for (size_t i=0; i<10; i++) {
#if defined(USE_EXPLICIT_FUNCTION)
ofxOscMessage m;
m.setAddress("/test");
m.addIntArg(std::int32_t(i));
m.addIntArg(port_);
m.addIntArg(i);
tester->sender_->sendMessage(m);
#else
ofxOscMessage m{"/test"};
m.add(i);
tester->sender_->sendMessage(m.add(port_).add(i));
#endif
msgs_out_++;
}
}

for (const auto & tester: testers_) {
while (auto m = tester->receiver_->getMessage()) {
msgs_++;
}
}
}

void ofApp::draw() {
for (int i = 0; i < 1000; i++) {
ofSetColor(ofRandom(0,255), ofRandom(0, 255), ofRandom(0, 255));
ofDrawRectangle(i *15, 10, 10, 50);
}

ofSetColor(ofColor::white);
ofDrawBitmapString("receivers destroyed/created: " + ofToString(receivers_), 10, 80);
ofDrawBitmapString("sent: " + ofToString(msgs_out_), 10, 100);
ofDrawBitmapString("received: " + ofToString(msgs_), 10, 120);
auto miss = msgs_out_-msgs_;
ofDrawBitmapString("= lost: " + ofToString(miss) + " (" + ofToString((float(miss)/(msgs_out_))*100.0f, 2)+"%)", 10, 140);

ofDrawBitmapString("mode 1: dynamic reallocation every 3 frames (<5% loss)", 10, 200);
ofDrawBitmapString("mode 2: open/close every 3 frames (normal to have 50% loss)", 10, 220);
ofDrawBitmapString("mode 3: stable (should have 0% loss)", 10, 240);
ofDrawBitmapString("current mode: "+ofToString(mode_+1), 10, 260);

ofDrawBitmapString("FPS (unsync'ed to stress things): " + ofToString(ofGetFrameRate(), 2), 10, ofGetHeight()-30);
}

void ofApp::keyPressed(int key){
if (key=='1' ) mode_ = DYNA;
if (key=='2' ) mode_ = STOP;
if (key=='3' ) mode_ = STABLE;
msgs_ = 0;
msgs_out_ = 0;
}

38 changes: 38 additions & 0 deletions apps/devApps/StressofxOscReceiver/src/ofApp.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#pragma once

#include "ofMain.h"
#include "ofxOsc.h"

struct Tester {
std::shared_ptr<ofxOscSender> sender_;
std::shared_ptr<ofxOscReceiver> receiver_;
};

class ofApp : public ofBaseApp{

enum Mode {
DYNA,
STOP,
STABLE
};

Mode mode_ { DYNA };
long msgs_ { 0 };
long msgs_out_ { 0 };
long receivers_ { 0 };

bool dyna_ { true } ;
const size_t low_port_ { 2000 };
const size_t high_port_ { 65000 };
size_t port_ { low_port_ };

std::array<std::shared_ptr<Tester>, 10> testers_;


public:
void setup() override;
void update() override;
void draw() override;
void keyPressed(int key) override;

};