-
Notifications
You must be signed in to change notification settings - Fork 821
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel conference bridge #4241
base: master
Are you sure you want to change the base?
Parallel conference bridge #4241
Conversation
I like the premise of the introduction of parallelism. I'm curious whether you have any performance speedup data. It must be noted though, that the additional compile-time switches will undoubtedly make conference much much more difficult to maintain and debug should issues arise. So I wonder whether the performance boost is worth the additional complexity. |
OpenMP conference bridge was inroduced 3-4 years ago. No tests were performed with a lower OMP_NUM_THREADS value. On compile-time switching. To avoid conditional compilation, we can add an "OpenMP stub" that emulates sequential semantics. See an example here: |
I like the idea of adding an additional layer (i.e. OpenMP stub -- perhaps this can be put in pjlib) to avoid compilation-time switches. I believe this is necessary if we want to integrate this, otherwise the conference code will become such a nightmare to read. So for me, I vote towards adopting this (with the condition of removing the compile-time switches). But before we proceed further, let's hear first what others think about this parallel feature. Also, your note about #2251 is interesting. So should we undeprecate/reactivate the callbacks |
Yes, I think it is a good idea to wrap OpenMP in PJLIB (or perhaps PJLIB-UTIL?), for readibility/maintainability & platform compatibility. Also, we use background processing in some places already (e.g: job queue, worker thread, event manager), this new framework (background & multiprocessing) may standardize them perhaps.
Maybe :) |
First of all, thank you for the patch submission again. This is really interesting and exciting, especially to hear that the parallel version can achieve much higher performance than the plain one. However, unfortunately it is implemented using OpenMP... :) OpenMP is quite a "beast" to support. It's (too) high level, too implicit, requires support from many tools (compilers, debuggers), will require changes in build commands, require another skill set to master, and last but not least, not supported by iOS and Android (at least officially). I would very much prefer it to be implemented using If there is something like "thread pool" in pjlib (similar to Python's process pool), would it help? (and more importantly, are you willing to change it to use it :) Or if you want to submit this as is, then I think the best way is to "fork" |
Short answer: Let's try. Please wait for a more detailed answer in a couple of days. I'm a bit busy right now. |
More Detailed Answer A quick web search shows that OpenMP is supported by the Android NDK starting with r11 (though this information should be verified!). However, iOS still does not officially support OpenMP. > something like "thread pool" in pjlib Yes! I’ve always wondered how pjsip works without a thread pool! ☺ As a general rule, Windows programs rarely create threads explicitly. Instead, they register callbacks of various types (e.g., IO, events, timers, etc.) with a thread pool managed by the OS kernel (Microsoft Docs on Thread Pools). With more information about the workload, the OS can manage the thread pool more efficiently than an application, for example, by deciding whether to start a new thread if all pool threads are waiting. It would be highly beneficial to have platform-dependent thread pool support in pjlib! The main challenge preventing the use of the Windows thread pool API right now is the need to register a pj_thread_t object for each thread, which is inconvenient for threads created and managed outside the application’s control. To avoid memory leaks, a "wrapper" is required to register the thread at the start of the callback and unregister it at the end. However, it's unclear how to make this wrapper completely transparent to the application. > to "fork" conference.c The current conference implementation already includes numerous enhancements unrelated to OpenMP:
I propose integrating these enhancements first, then creating a "forked" version, and only after that proceeding with further work on parallelism "In the real world" In practice, I use a heavily optimized IOCP queue and RTP transport. I haven’t tested the standard ioqueue implementation, so I’m unsure how it behaves in multithreaded scenarios. It’s possible that both ioqueue and transport may require optimization after parallelizing the conference bridge (in my case, the bridge was optimized last). |
Thanks for Android OpenMP info. The initial idea for the thread pool is a high level pool of pj threads, unlike Windows thread pool which is a low level OS object, I think. Let me check if it can be abstracted using the same API. But the main objective of pj thread pool is to execute N jobs using M threads and as replacement for OpenMP. Yes I notice there are many other changes in conf unrelated to parallelism. It would be better to submit them as separate enhancements. Thanks for the communication. Although our high level API is geared towards client, the "core" was intended to be high performant, hence this topic is very interesting to us. |
universal omp/non omp code
…ence-bridge # Conflicts: # pjmedia/src/pjmedia/conference.c
Hi! Here is the updated version that avoids the use of OpenMP functions and conditional compilation where possible. Instead, only OpenMP pragmas are utilized, though some OpenMP functions are retained for debugging purposes. Key notes:
Minor change:
Could you clarify: does a PJ thread pool currently exist, or is it a planned API? Such an API would significantly simplify the implementation of a multi-threaded conference bridge. Regarding the Windows thread pool API: The primary obstacle to leveraging this well-designed API is the current requirement in PJSIP to register threads without the option to unregister them. A similar challenge arises in the OpenMP version and, more broadly, in scenarios where the user program doesn’t control the creation and termination of threads. Implementing the ability to unregister threads would be highly beneficial. This would enable support for platform-dependent pools like the Windows thread pool. |
Hi, everybody! In this commit besides conference.c:
Shortly about this two punkts, then about parallel bridge: a) To compile the OpenMP conference bridge, the user must add #define PJMEDIA_CONF_USE_OPENMP 1 line to config_site.h AND configure the development environment to use OpenMP. However, the main motivation for this may be testing and comparing the performance of different implementations. My tests show that the OpenMP implementation has no advantages over the "native" pj_thread threading. b) The native multithreading used with the parallel conference bridge uses a synchronization barrier and required the implementation of this API. The API is implemented in 5 (phew!) variants:
About the parallel bridge implementation The OpenMP-based implementation uses the processor cores allocated to it at 100%. For example, if the host has 40 cores and OMP_NUM_THREADS=8, 8 cores are used at about 100%, and 32 at about 0%, the average load is 20%. If OMP_NUM_THREADS==8 and the host has 8 cores, then the load is 100% (actually 90-100%). By default, the conference bridge is serial. At compile time, the user can define the macro PJ_CONF_BRIDGE_MAX_THREADS. This is the number of threads that the conference bridge should use. This value is used to determine whether the conference bridge should be implemented as a parallel bridge or not. If this value is 1, the conference bridge will be implemented as a serial bridge, otherwise it will be implemented as a parallel bridge. The current implementation uses a static thread pool, as an optimization, it would be better to dynamically determine the required number of threads of the thread pool. |
Thanks! Sorry for the late reply, I'm reviewing it now and will get back with more detailed replies. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in general this is great, thanks for the hard work! I'm okay in general. I haven't reviewed conference.c
in detailed, because I thought we should be addressing these points below first because they will modify the patch significantly.
-
I would like to inform you about our coding style (which I just wrote in more detail, previously it was too vague :), pls have a look at https://docs.pjsip.org/en/coding-style/get-started/coding-style.html
-
If you have unit test codes, it would be good to include as well (for the barrier, and conf maybe). If not, I can help with writing it.
-
Activating the parallel feature (proposal, others pls comment):
At compile time, user should have option to disable multithreading code. This is activated by (new) macro PJMEDIA_CONF_HAS_THREADS, which default value is PJ_HAS_THREADS. I don't require that when threading is disabled we must use the old code, because this could result in too many variants of code.
At run time, user should have the option to control the number of worker threads to use, from 0-N. Zero means the operations will be done only by get_frame() thread. I propose creating new API as follows:
typedef struct pjmedia_conf_param
{
unsigned max_slots;
unsigned sampling_rate;
unsigned channel_count;
unsigned samples_per_frame;
unsigned bits_per_sample;
unsigned options;
unsigned worker_threads;
} pjmedia_conf_param;
PJ_INLINEvoid) pjmedia_conf_param_default(*param)
{
pj_bzero(param);
}
PJ_DEF(pj_status_t) pjmedia_conf_create2(pool, pjmedia_conf_param *, *p_port);
@bennylp Yes, that would be helpful. I don't have tests.
I don't quite understand what you mean. Currently, the bridge is serial by default (no worker threads are created by default). To use multithreading from a pjsua/pjsua2 application, the user must define the PJ_CONF_BRIDGE_MAX_THREADS macro, which must be > 1 (1 is the get_frame() thread only). Almost everything is done here, however the latest changes are not fully documented yet. Please review. |
implemented compile time and runtime conf bridge multithreading control. |
/** | ||
* Flags that control the behavior of the barrier | ||
* Supported on Windows platform starting from Windows 8 | ||
* Otherwize, the flags are ignored. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not very clear whether for other platforms, the flags are also ignored.
If so, perhaps add the word:
Only supported ...
.
unsigned options; | ||
|
||
/** | ||
* The number of worker threads to use. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add doc that the value will not be used for switchboard.
Also in pjsua and pjsua2.
@@ -31,7 +34,13 @@ | |||
#include <pj/pool.h> | |||
#include <pj/string.h> | |||
|
|||
#if !defined(PJMEDIA_CONF_USE_SWITCH_BOARD) || PJMEDIA_CONF_USE_SWITCH_BOARD==0 | |||
#if defined(PJ_STACK_IMPLEMENTATION) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it's not integrated yet (and still not clear whether it will be), better remove this and all the stack usages first for now.
Alternatively, you can re-create a PR for the atomic list as suggested in the previous PR so it can be merged first.
unsigned connect_cnt; /**< Total number of connections */ | ||
pjmedia_snd_port *snd_dev_port; /**< Sound device port. */ | ||
pjmedia_port *master_port; /**< Port zero's port. */ | ||
char master_name_buf[80]; /**< Port0 name buffer. */ | ||
pj_mutex_t *mutex; /**< Conference mutex. */ | ||
struct conf_port **ports; /**< Array of ports. */ | ||
unsigned clock_rate; /**< Sampling rate. */ | ||
unsigned sampling_rate;/**< Sampling rate. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what the purpose for this variable change?
It causes the modifications to be more extensive than necessary.
#endif | ||
conf->free_port_slots = pj_pool_calloc(pool, conf->max_ports, sizeof(port_slot)); | ||
PJ_ASSERT_ON_FAIL(conf->free_port_slots, {pjmedia_conf_destroy(conf); return PJ_ENOMEM;}); | ||
unsigned i = conf->max_ports; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put variable declaration on top of the block, otherwise it may fail to compile on older VS.
@@ -2835,7 +3610,7 @@ static pj_status_t get_frame_pasv(pjmedia_port *this_port, | |||
/* | |||
* Recorder (or passive port) callback. | |||
*/ | |||
static pj_status_t put_frame(pjmedia_port *this_port, | |||
static pj_status_t put_frame(pjmedia_port *this_port, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*cosmetic
@@ -856,36 +1140,91 @@ static pj_status_t resume_sound( pjmedia_conf *conf ) | |||
/** | |||
* Destroy conference bridge. | |||
*/ | |||
PJ_DEF(pj_status_t) pjmedia_conf_destroy( pjmedia_conf *conf ) | |||
PJ_DEF(pj_status_t) pjmedia_conf_destroy(pjmedia_conf *conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if possible, refrain from modifying code white spaces. it creates an unnecessarily larger patch that's more difficult to review. I will mark several of these (but not all so it doesn't flood the review) with "*cosmetic" in the comment.
@@ -2845,7 +3620,7 @@ static pj_status_t put_frame(pjmedia_port *this_port, | |||
/* Check for correct size. */ | |||
PJ_ASSERT_RETURN( frame->size == conf->samples_per_frame * | |||
conf->bits_per_sample / 8, | |||
PJMEDIA_ENCSAMPLESPFRAME); | |||
PJMEDIA_ENCSAMPLESPFRAME ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*cosmetic
The strictly sequential and single-threaded conference bridge of pjsip must service all connected ports within a single timer tick, inevitably leading to limitations on the number of serviced ports and high CPU performance requirements. The goal of this pull request is to implement parallel servicing of conference bridge ports while preserving the switch behavior as much as possible and minimizing changes to the original codebase.
Parallelism is implemented using the OpenMP C/C++ Application Program Interface (API) (hereinafter referred to as OpenMP), which allows declarative parallel execution of code segments that were not initially designed for parallel processing. OpenMP is cross-platform and supported by the vast majority of C language compilers (and other languages). This solution uses a set of OpenMP tools limited to version 2.0, an older standard version. This ensures that all modern compilers can compile and execute this code correctly.
OpenMP support is not enabled by default; each development environment must be explicitly configured to use OpenMP, which is disabled by default. This ensures compatibility of the proposed solution with applications that do not require parallel switching. No changes are required for such applications.
The changes to the source code primarily involve two aspects:
Instead of using the shared buffer provided by the master port for all ports to read data, each port reserves its own buffer and reads data into it. This enables parallel reading of data by different ports. (This approach uses memory less efficiently; in practice, a buffer is needed not for each port but for each thread.)
Since data from different input ports may need to be mixed into the buffer of the same output port to create a conference, access to this buffer must be synchronized. For this purpose, each port creates its own lock using OpenMP tools, which is then used to synchronize the mixing of data from different sources.
The entire get_frame() function (as before) is divided into three steps:
Each of steps 2 and 3 represents an OpenMP parallel execution region. The steps themselves are executed sequentially, meaning the next step begins only after all tasks from the previous step have been completed by all threads.
Unlike step 2, during step 3, the ports operate completely independently; no changes to the switching scheme affect the data processing for the ports. Therefore, the tasks in the asynchronous switching queue are executed concurrently with the main tasks of step 3. To prevent premature deletion of ports with transmitters, the grplock reference count of such ports is incremented during initialization (step 1). The reference count is decremented when the port processing is completed in step 3. This ensures that even if an OP_REMOVE_PORT operation is performed concurrently with data transmission in step 3, the physical resources will only be released once they are no longer in use.
In this version, the remaining risk of deadlocks after the introduction of asynchronous switching #3928 has also been resolved. The grplock handle, called within the OP_REMOVE_PORT operation under the protection of the conference bridge mutex, could previously initiate other locks in an unpredictable order. This was a potential source of deadlocks. However, the asynchronous switching algorithm ensures that such operations are executed by only one thread at a time. Therefore, these operations do not require additional synchronization. For this reason, the execution of asynchronous operations has been moved out from under the protection of the conference bridge mutex. This mutex is required only for the asynchronous operation queue, not for the operations themselves.
Other changes in the code are minor and not critical to the overall optimization concept.
For instance, the aforementioned step 1 does not initialize the output port buffers. Instead, ports store a "timestamp" of the last frame loaded into the buffer. If this timestamp differs from the timestamp of the frame being loaded, the first (and possibly only) frame is loaded into the buffer. This allows for straightforward copying without prior initialization and without unnecessary summing with zero. Importantly, the timestamp itself only increases and, therefore, generally does not require separate initialization.
The OP_ADD_PORT operation has been excluded from asynchronous operations. This version of the conference bridge does not use port counters or the is_new flag. Instead, lower and upper bounds for the range of active ports are maintained. An "active" port is defined as one that is connected to something, i.e., has a non-zero number of listeners or transmitters. Only such ports participate in the sound transmission process. Immediately after being added, a port is not yet active and does not affect the operation of the conference bridge. The appearance of such a port concurrently with the execution of get_frame() does not influence the execution of steps 1-3 in any way.
Creating new ports can also be performed "practically" in parallel. Ideally, it would be "fully parallel," but reserving a slot in the port array, though performed in O(1) time, still requires mutex locking for a short duration (by default, pj_stack is not used). Once the slot is reserved, further creation and initialization of the port within the reserved slot can be carried out concurrently with similar actions in other slots.
The only expected change in behavior is not directly related to parallelism and involves special handling of the PJ_EEOF code returned by read_port(). This code is interpreted as a signal from the port that it no longer has and will not produce any new data (e.g., a fileplayer has reached the end of the file). In this case, rx_setting = PJMEDIA_PORT_DISABLE is triggered, ensuring that no further attempts are made to retrieve data from the port. This also prevents repeated triggering of the eof_cb() and eof_cb2() callbacks, sparing the application from unnecessary calls.
Incidentally, the asynchronous switching implementation in PR #3928 resolved many deadlock-related issues. Among other improvements, it eliminated the need to prohibit very convenient synchronous callbacks (see #2251).