Skip to content

Commit

Permalink
Fix thread safety for IO threads stopping
Browse files Browse the repository at this point in the history
We can not request transport IO threads stopping from more than one
thread at the same time. This will result in more than one cancel
threads request being queued in the thread manager, which in turn may
lead to premature thread termination in case when IO thread is stopped
and started right away.

This commit adds IO threads stopping synchronization on the transport
level by using the stopping flag.
  • Loading branch information
arkq committed Mar 25, 2023
1 parent 88fb35c commit 2f874bb
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 33 deletions.
89 changes: 60 additions & 29 deletions src/ba-transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ static void transport_threads_cancel(struct ba_transport *t) {
t->stopping = false;
pthread_mutex_unlock(&t->bt_fd_mtx);

pthread_cond_signal(&t->stopped);
pthread_cond_broadcast(&t->stopped);

}

Expand All @@ -519,26 +519,23 @@ static void transport_threads_cancel_if_no_clients(struct ba_transport *t) {

bool stop = false;

if (t->stopping)
goto final;

if (t->profile & BA_TRANSPORT_PROFILE_A2DP_SOURCE) {
/* Release bidirectional A2DP transport only in case when there
* is no active PCM connection - neither encoder nor decoder. */
if (t->a2dp.pcm.fd == -1 && t->a2dp.pcm_bc.fd == -1)
t->stopping = stop = true;
}
else if (t->profile & BA_TRANSPORT_PROFILE_MASK_AG) {
/* For Audio Gateway profile it is required to release SCO if we
* are not transferring audio (not sending nor receiving), because
* it will free Bluetooth bandwidth - headset will send microphone
* signal even though we are not reading it! */
if (t->sco.spk_pcm.fd == -1 && t->sco.mic_pcm.fd == -1)
t->stopping = stop = true;
if (!t->stopping) {
if (t->profile & BA_TRANSPORT_PROFILE_A2DP_SOURCE) {
/* Release bidirectional A2DP transport only in case when there
* is no active PCM connection - neither encoder nor decoder. */
if (t->a2dp.pcm.fd == -1 && t->a2dp.pcm_bc.fd == -1)
t->stopping = stop = true;
}
else if (t->profile & BA_TRANSPORT_PROFILE_MASK_AG) {
/* For Audio Gateway profile it is required to release SCO if we
* are not transferring audio (not sending nor receiving), because
* it will free Bluetooth bandwidth - headset will send microphone
* signal even though we are not reading it! */
if (t->sco.spk_pcm.fd == -1 && t->sco.mic_pcm.fd == -1)
t->stopping = stop = true;
}
}

final:

pthread_mutex_unlock(&t->bt_fd_mtx);

if (stop) {
Expand Down Expand Up @@ -1536,6 +1533,39 @@ int ba_transport_start(struct ba_transport *t) {
return -1;
}

/**
* Schedule transport IO threads cancellation. */
static int ba_transport_stop_async(struct ba_transport *t) {

#if DEBUG
/* Assert that we were called with the lock held, so we
* can safely check and modify the stopping flag. */
g_assert_cmpint(pthread_mutex_trylock(&t->bt_fd_mtx), !=, 0);
#endif

if (t->stopping)
return 0;

t->stopping = true;

/* Unlock the mutex before updating thread states. This is necessary to avoid
* lock order inversion with the code in the ba_transport_thread_bt_acquire()
* function. It is safe to do so, because we have already set the stopping
* flag, so the transport_threads_cancel() function will not be called before
* we acquire the lock again. */
pthread_mutex_unlock(&t->bt_fd_mtx);

ba_transport_thread_state_set_stopping(&t->thread_enc);
ba_transport_thread_state_set_stopping(&t->thread_dec);

pthread_mutex_lock(&t->bt_fd_mtx);

if (transport_thread_manager_send_command(t, BA_TRANSPORT_THREAD_MANAGER_CANCEL_THREADS) != 0)
return -1;

return 0;
}

/**
* Stop transport IO threads.
*
Expand All @@ -1547,16 +1577,17 @@ int ba_transport_stop(struct ba_transport *t) {
ba_transport_thread_state_check_terminated(&t->thread_dec))
return 0;

ba_transport_thread_state_set_stopping(&t->thread_enc);
ba_transport_thread_state_set_stopping(&t->thread_dec);
pthread_mutex_lock(&t->bt_fd_mtx);

if (transport_thread_manager_send_command(t, BA_TRANSPORT_THREAD_MANAGER_CANCEL_THREADS) != 0)
return -1;
int rv;
if ((rv = ba_transport_stop_async(t)) == -1)
goto fail;

int rv = 0;
rv |= ba_transport_thread_state_wait_terminated(&t->thread_enc);
rv |= ba_transport_thread_state_wait_terminated(&t->thread_dec);
while (t->stopping)
pthread_cond_wait(&t->stopped, &t->bt_fd_mtx);

fail:
pthread_mutex_unlock(&t->bt_fd_mtx);
return rv;
}

Expand Down Expand Up @@ -1888,9 +1919,9 @@ void ba_transport_thread_cleanup(struct ba_transport_thread *th) {
/* For proper functioning of the transport, all threads have to be
* operational. Therefore, if one of the threads is being cancelled,
* we have to cancel all other threads. */
ba_transport_thread_state_set_stopping(&t->thread_enc);
ba_transport_thread_state_set_stopping(&t->thread_dec);
transport_thread_manager_send_command(t, BA_TRANSPORT_THREAD_MANAGER_CANCEL_THREADS);
pthread_mutex_lock(&t->bt_fd_mtx);
ba_transport_stop_async(t);
pthread_mutex_unlock(&t->bt_fd_mtx);

/* Release BT socket file descriptor duplicate created either in the
* ba_transport_thread_create() function or in the IO thread itself. */
Expand Down
6 changes: 2 additions & 4 deletions test/test-io.c
Original file line number Diff line number Diff line change
Expand Up @@ -594,15 +594,13 @@ static void test_io(struct ba_transport *t_src, struct ba_transport *t_snk,
ba_transport_pcm_release(t_src_pcm);
pthread_mutex_unlock(&t_src_pcm->mutex);

ba_transport_thread_state_set_stopping(&t_src->thread_enc);
transport_thread_cancel(&t_src->thread_enc);
ba_transport_stop(t_src);

pthread_mutex_lock(&t_snk_pcm->mutex);
ba_transport_pcm_release(t_snk_pcm);
pthread_mutex_unlock(&t_snk_pcm->mutex);

ba_transport_thread_state_set_stopping(&t_snk->thread_dec);
transport_thread_cancel(&t_snk->thread_dec);
ba_transport_stop(t_snk);

}

Expand Down

0 comments on commit 2f874bb

Please sign in to comment.