Skip to content

Commit

Permalink
Dedicated internal API for transport threads
Browse files Browse the repository at this point in the history
This commit should not change any internal logic. It will simplify
internal architecture by wrapping transport thread data with dedicated
structure. It might be useful in future development where transport may
be associated with more than one thread.
  • Loading branch information
arkq committed Jan 9, 2021
1 parent 9c0fae7 commit fbe0fc6
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 260 deletions.
153 changes: 84 additions & 69 deletions src/a2dp-audio.c

Large diffs are not rendered by default.

199 changes: 116 additions & 83 deletions src/ba-transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ static int transport_pcm_init(
return 0;
}

static void transport_pcm_destroy(
static void transport_pcm_free(
struct ba_transport_pcm *pcm) {

ba_transport_pcm_release(pcm);
Expand All @@ -96,6 +96,52 @@ static void transport_pcm_destroy(

}

static int transport_thread_init(
struct ba_transport_thread *th,
struct ba_transport *t) {

th->t = t;
th->id = config.main_thread;
th->pipe[0] = -1;
th->pipe[1] = -1;

if (pipe(th->pipe) == -1)
return -1;

return 0;
}

/**
* Synchronous transport thread cancellation. */
static void transport_thread_cancel(struct ba_transport_thread *th) {

if (pthread_equal(th->id, config.main_thread) ||
pthread_equal(th->id, pthread_self()))
return;

int err;
if ((err = pthread_cancel(th->id)) != 0 && err != ESRCH)
warn("Couldn't cancel transport thread: %s", strerror(err));
if ((err = pthread_join(th->id, NULL)) != 0)
warn("Couldn't join transport thread: %s", strerror(err));

/* Indicate that the thread has been successfully terminated. Also,
* make sure, that after termination, this thread handler will not
* be used anymore. */
th->id = config.main_thread;

}

/**
* Release transport thread resources. */
static void transport_thread_free(
struct ba_transport_thread *th) {
if (th->pipe[0] != -1)
close(th->pipe[0]);
if (th->pipe[1] != -1)
close(th->pipe[1]);
}

/**
* Create new transport.
*
Expand All @@ -122,20 +168,16 @@ static struct ba_transport *transport_new(

pthread_mutex_init(&t->mutex, NULL);

t->thread = config.main_thread;

t->bt_fd = -1;
t->sig_fd[0] = -1;
t->sig_fd[1] = -1;

if (transport_thread_init(&t->thread, t) != 0)
goto fail;

if ((t->bluez_dbus_owner = strdup(dbus_owner)) == NULL)
goto fail;
if ((t->bluez_dbus_path = strdup(dbus_path)) == NULL)
goto fail;

if (pipe(t->sig_fd) == -1)
goto fail;

pthread_mutex_lock(&device->transports_mutex);
g_hash_table_insert(device->transports, t->bluez_dbus_path, t);
pthread_mutex_unlock(&device->transports_mutex);
Expand Down Expand Up @@ -283,26 +325,6 @@ struct ba_transport *ba_transport_ref(
return t;
}

/**
* Synchronous transport thread cancellation. */
static void ba_transport_pthread_cancel(struct ba_transport *t) {

if (pthread_equal(t->thread, config.main_thread) ||
pthread_equal(t->thread, pthread_self()))
return;

int err;
if ((err = pthread_cancel(t->thread)) != 0 && err != ESRCH)
warn("Couldn't cancel transport thread: %s", strerror(err));
if ((err = pthread_join(t->thread, NULL)) != 0)
warn("Couldn't join transport thread: %s", strerror(err));

/* Indicate that the thread has been successfully terminated. Also, make sure,
* that after termination, this thread handler will not be used anymore. */
t->thread = config.main_thread;

}

void ba_transport_destroy(struct ba_transport *t) {

/* Remove D-Bus interfaces, so no one will access
Expand All @@ -323,7 +345,7 @@ void ba_transport_destroy(struct ba_transport *t) {
* terminate the IO thread (or at least make sure it is not running any
* more). Not doing so might result in an undefined behavior or even a
* race condition (closed and reused file descriptor). */
ba_transport_pthread_cancel(t);
transport_thread_cancel(&t->thread);

/* terminate on-going PCM connections - exit PCM controllers */
if (t->type.profile & BA_TRANSPORT_PROFILE_MASK_A2DP) {
Expand Down Expand Up @@ -361,25 +383,23 @@ void ba_transport_unref(struct ba_transport *t) {

if (t->bt_fd != -1)
close(t->bt_fd);
if (t->sig_fd[0] != -1)
close(t->sig_fd[0]);
if (t->sig_fd[1] != -1)
close(t->sig_fd[1]);

ba_device_unref(d);

if (t->type.profile & BA_TRANSPORT_PROFILE_MASK_A2DP) {
transport_pcm_destroy(&t->a2dp.pcm);
transport_pcm_destroy(&t->a2dp.pcm_bc);
transport_pcm_free(&t->a2dp.pcm);
transport_pcm_free(&t->a2dp.pcm_bc);
free(t->a2dp.configuration);
}
else if (t->type.profile & BA_TRANSPORT_PROFILE_MASK_SCO) {
if (t->sco.rfcomm != NULL)
ba_rfcomm_destroy(t->sco.rfcomm);
transport_pcm_destroy(&t->sco.spk_pcm);
transport_pcm_destroy(&t->sco.mic_pcm);
transport_pcm_free(&t->sco.spk_pcm);
transport_pcm_free(&t->sco.mic_pcm);
}

transport_thread_free(&t->thread);

pthread_mutex_destroy(&t->mutex);
free(t->bluez_dbus_owner);
free(t->bluez_dbus_path);
Expand All @@ -395,26 +415,6 @@ void ba_transport_pcm_unref(struct ba_transport_pcm *pcm) {
ba_transport_unref(pcm->t);
}

int ba_transport_send_signal(struct ba_transport *t, enum ba_transport_signal sig) {
return write(t->sig_fd[1], &sig, sizeof(sig));
}

enum ba_transport_signal ba_transport_recv_signal(struct ba_transport *t) {

enum ba_transport_signal sig;
ssize_t ret;

while ((ret = read(t->sig_fd[0], &sig, sizeof(sig))) == -1 &&
errno == EINTR)
continue;

if (ret == sizeof(sig))
return sig;

warn("Couldn't read transport signal: %s", strerror(errno));
return BA_TRANSPORT_SIGNAL_PING;
}

int ba_transport_select_codec_a2dp(
struct ba_transport *t,
const struct a2dp_sep *sep) {
Expand Down Expand Up @@ -626,20 +626,25 @@ void ba_transport_set_codec(

int ba_transport_start(struct ba_transport *t) {

if (!pthread_equal(t->thread, config.main_thread))
if (!pthread_equal(t->thread.id, config.main_thread))
return 0;

debug("Starting transport: %s", ba_transport_type_to_string(t->type));

if (t->type.profile & BA_TRANSPORT_PROFILE_MASK_A2DP)
return a2dp_audio_thread_create(t);
if (t->type.profile & BA_TRANSPORT_PROFILE_MASK_SCO)
return ba_transport_pthread_create(t, sco_thread, "ba-sco");
return ba_transport_thread_create(&t->thread, sco_thread, "ba-sco");

errno = ENOTSUP;
return -1;
}

int ba_transport_stop(struct ba_transport *t) {
transport_thread_cancel(&t->thread);
return 0;
}

int ba_transport_set_a2dp_state(
struct ba_transport *t,
enum bluez_a2dp_transport_state state) {
Expand All @@ -655,8 +660,7 @@ int ba_transport_set_a2dp_state(
return ba_transport_start(t);
case BLUEZ_A2DP_TRANSPORT_STATE_IDLE:
default:
ba_transport_pthread_cancel(t);
return 0;
return ba_transport_stop(t);
}
}

Expand Down Expand Up @@ -724,13 +728,13 @@ int ba_transport_pcm_volume_update(struct ba_transport_pcm *pcm) {
}

int ba_transport_pcm_pause(struct ba_transport_pcm *pcm) {
ba_transport_send_signal(pcm->t, BA_TRANSPORT_SIGNAL_PCM_PAUSE);
ba_transport_thread_send_signal(&pcm->t->thread, BA_TRANSPORT_SIGNAL_PCM_PAUSE);
debug("PCM paused: %d", pcm->fd);
return 0;
}

int ba_transport_pcm_resume(struct ba_transport_pcm *pcm) {
ba_transport_send_signal(pcm->t, BA_TRANSPORT_SIGNAL_PCM_RESUME);
ba_transport_thread_send_signal(&pcm->t->thread, BA_TRANSPORT_SIGNAL_PCM_RESUME);
debug("PCM resumed: %d", pcm->fd);
return 0;
}
Expand All @@ -739,12 +743,12 @@ int ba_transport_pcm_drain(struct ba_transport_pcm *pcm) {

struct ba_transport *t = pcm->t;

if (pthread_equal(t->thread, config.main_thread))
if (pthread_equal(t->thread.id, config.main_thread))
return errno = ESRCH, -1;

pthread_mutex_lock(&pcm->synced_mtx);

ba_transport_send_signal(t, BA_TRANSPORT_SIGNAL_PCM_SYNC);
ba_transport_thread_send_signal(&t->thread, BA_TRANSPORT_SIGNAL_PCM_SYNC);
pthread_cond_wait(&pcm->synced, &pcm->synced_mtx);

pthread_mutex_unlock(&pcm->synced_mtx);
Expand All @@ -764,7 +768,7 @@ int ba_transport_pcm_drain(struct ba_transport_pcm *pcm) {
}

int ba_transport_pcm_drop(struct ba_transport_pcm *pcm) {
ba_transport_send_signal(pcm->t, BA_TRANSPORT_SIGNAL_PCM_DROP);
ba_transport_thread_send_signal(&pcm->t->thread, BA_TRANSPORT_SIGNAL_PCM_DROP);
debug("PCM dropped: %d", pcm->fd);
return 0;
}
Expand Down Expand Up @@ -945,7 +949,7 @@ int ba_transport_pcm_release(struct ba_transport_pcm *pcm) {
* execution. In this release function it is important to perform actions
* atomically. Since close call is a cancellation point, it is required to
* temporally disable cancellation. For a better understanding of what is
* going on, see the io_thread_read_pcm() function. */
* going on, see the ba_transport_pcm_read() function. */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate);

debug("Closing PCM: %d", pcm->fd);
Expand All @@ -958,57 +962,86 @@ int ba_transport_pcm_release(struct ba_transport_pcm *pcm) {

/**
* Create transport thread. */
int ba_transport_pthread_create(
struct ba_transport *t,
void *(*routine)(struct ba_transport *),
int ba_transport_thread_create(
struct ba_transport_thread *th,
void *(*routine)(struct ba_transport_thread *),
const char *name) {

struct ba_transport *t = th->t;
int ret;

if ((ret = pthread_create(&t->thread, NULL,
PTHREAD_ROUTINE(routine), ba_transport_ref(t))) != 0) {
ba_transport_ref(t);
if ((ret = pthread_create(&th->id, NULL, PTHREAD_ROUTINE(routine), th)) != 0) {
error("Couldn't create transport thread: %s", strerror(ret));
t->thread = config.main_thread;
th->id = config.main_thread;
ba_transport_unref(t);
return -1;
}

pthread_setname_np(t->thread, name);
debug("Created new thread [%s]: %s", name, ba_transport_type_to_string(t->type));
pthread_setname_np(th->id, name);
debug("Created new transport thread [%s]: %s",
name, ba_transport_type_to_string(t->type));

return 0;
}

int ba_transport_thread_send_signal(
struct ba_transport_thread *th,
enum ba_transport_signal sig) {
return write(th->pipe[1], &sig, sizeof(sig));
}

enum ba_transport_signal ba_transport_thread_recv_signal(
struct ba_transport_thread *th) {

enum ba_transport_signal sig;
ssize_t ret;

while ((ret = read(th->pipe[0], &sig, sizeof(sig))) == -1 &&
errno == EINTR)
continue;

if (ret == sizeof(sig))
return sig;

warn("Couldn't read transport thread signal: %s", strerror(errno));
return BA_TRANSPORT_SIGNAL_PING;
}

/**
* Wrapper for release callback, which can be used by the pthread cleanup.
*
* This function CAN be used with ba_transport_pthread_cleanup_lock() in order
* to guard transport critical section during cleanup process. */
void ba_transport_pthread_cleanup(struct ba_transport *t) {
* This function CAN be used with ba_transport_thread_cleanup_lock() in order
* to guard transport thread critical section during cleanup process. */
void ba_transport_thread_cleanup(struct ba_transport_thread *th) {

struct ba_transport *t = th->t;

/* During the normal operation mode, the release callback should not
* be NULL. Hence, we will relay on this callback - file descriptors
* are closed in it. */
if (t->release != NULL)
t->release(t);

ba_transport_pthread_cleanup_unlock(t);
ba_transport_thread_cleanup_unlock(th);

/* XXX: If the order of the cleanup push is right, this function will
* indicate the end of the IO/RFCOMM thread. */
* indicate the end of the transport IO thread. */
debug("Exiting IO thread: %s", ba_transport_type_to_string(t->type));

/* Remove reference which was taken by the io_thread_create(). */
/* Remove reference which was taken by the ba_transport_thread_create(). */
ba_transport_unref(t);
}

int ba_transport_pthread_cleanup_lock(struct ba_transport *t) {
int ba_transport_thread_cleanup_lock(struct ba_transport_thread *th) {
struct ba_transport *t = th->t;
int ret = pthread_mutex_lock(&t->mutex);
t->cleanup_lock = true;
return ret;
}

int ba_transport_pthread_cleanup_unlock(struct ba_transport *t) {
int ba_transport_thread_cleanup_unlock(struct ba_transport_thread *th) {
struct ba_transport *t = th->t;
if (!t->cleanup_lock)
return 0;
t->cleanup_lock = false;
Expand Down
Loading

0 comments on commit fbe0fc6

Please sign in to comment.