Removed schedulers and compat stuff.

Original commit message from CVS:
Removed schedulers and compat stuff.
Use iterators in bin for the various lookup functions.
Optimized the clock a bit.
Added some more docs.
Fixed scheduling in the ()-(l) case.
Fixed and added some testcases.
Removed old code.
This commit is contained in:
Wim Taymans 2005-01-26 10:56:09 +00:00
parent cec2ab1eac
commit 10e648c7d9
46 changed files with 689 additions and 8414 deletions

View file

@ -1,3 +1,91 @@
2005-01-26 Wim Taymans <wim@fluendo.com>
* docs/design/part-scheduling.txt:
* gst/elements/gstfakesrc.c: (gst_fakesrc_activate):
* gst/elements/gstidentity.c: (gst_identity_chain):
* gst/gst.c:
* gst/gst.h:
* gst/gstbin.c: (gst_bin_set_index), (gst_bin_set_clock),
(gst_bin_set_bus), (gst_bin_set_scheduler), (gst_bin_add_func),
(iterate_child), (gst_bin_iterate_elements),
(iterate_child_recurse), (gst_bin_iterate_recurse),
(gst_bin_iterate_recurse_up), (bin_element_is_sink),
(gst_bin_get_state), (gst_bin_change_state), (gst_bin_dispose),
(compare_name), (gst_bin_get_by_name),
(gst_bin_get_by_name_recurse_up), (compare_interface),
(gst_bin_get_by_interface), (gst_bin_iterate_all_by_interface):
* gst/gstbin.h:
* gst/gstbus.c: (gst_bus_post):
* gst/gstclock.c: (gst_clock_id_ref), (gst_clock_id_unref),
(gst_clock_id_compare_func), (gst_clock_id_wait),
(gst_clock_id_wait_async), (gst_clock_init),
(gst_clock_adjust_unlocked), (gst_clock_get_time):
* gst/gstclock.h:
* gst/gstcompat.h:
* gst/gstelement.c: (gst_element_add_pad),
(gst_element_remove_pad), (iterate_pad),
(gst_element_iterate_pads), (gst_element_get_state_func),
(gst_element_set_state), (gst_element_pads_activate),
(gst_element_change_state), (gst_element_dispose):
* gst/gstformat.c: (gst_format_iterate_definitions):
* gst/gstiterator.c: (gst_iterator_init), (gst_iterator_new),
(gst_list_iterator_next), (gst_list_iterator_free),
(gst_iterator_new_list), (gst_iterator_pop), (gst_iterator_next),
(gst_iterator_resync), (gst_iterator_free), (gst_iterator_push),
(gst_iterator_filter), (gst_iterator_foreach),
(gst_iterator_find_custom):
* gst/gstiterator.h:
* gst/gstobject.c: (gst_object_class_init), (gst_object_ref),
(gst_object_unref), (gst_object_sink), (gst_object_dispose),
(gst_object_dispatch_properties_changed), (gst_object_set_name),
(gst_object_set_parent), (gst_object_unparent),
(gst_object_check_uniqueness), (gst_object_get_path_string):
* gst/gstpad.c: (gst_pad_set_active), (gst_pad_is_active),
(gst_pad_set_blocked_async), (gst_pad_set_acceptcaps_function),
(gst_pad_set_fixatecaps_function), (gst_pad_unlink),
(gst_pad_link_prepare_filtered), (gst_pad_link_filtered),
(gst_pad_relink_filtered), (gst_real_pad_get_caps_unlocked),
(gst_pad_peer_get_caps), (gst_pad_fixate_caps),
(gst_pad_accept_caps), (gst_pad_peer_accept_caps),
(gst_pad_set_caps), (gst_pad_configure_sink),
(gst_pad_configure_src), (gst_pad_realize), (gst_pad_alloc_buffer),
(gst_pad_push), (gst_pad_pull_range), (gst_pad_push_event),
(gst_pad_send_event):
* gst/gstpipeline.c: (is_eos), (pipeline_bus_handler),
(gst_pipeline_change_state):
* gst/gstquery.c: (gst_query_type_iterate_definitions):
* gst/gstscheduler.c:
* gst/gstscheduler.h:
* gst/gstsystemclock.c: (gst_system_clock_init),
(gst_system_clock_async_thread),
(gst_system_clock_id_wait_unlocked), (gst_system_clock_id_wait),
(gst_system_clock_id_wait_async), (gst_system_clock_id_unschedule):
* gst/gsttask.h:
* gst/gsttypes.h:
* gst/schedulers/cothreads_compat.h:
* gst/schedulers/entryscheduler.c:
* gst/schedulers/faircothreads.c:
* gst/schedulers/faircothreads.h:
* gst/schedulers/fairscheduler.c:
* gst/schedulers/gstbasicscheduler.c:
* gst/schedulers/gstoptimalscheduler.c:
* gst/schedulers/gthread-cothreads.h:
* gst/schedulers/threadscheduler.c:
(gst_thread_scheduler_class_init), (gst_thread_scheduler_func):
* testsuite/bins/interface.c: (main):
* testsuite/dlopen/loadgst.c: (do_test):
* testsuite/states/.cvsignore:
* testsuite/states/Makefile.am:
* testsuite/states/stress.c: (message_received), (state_change),
(main):
Removed schedulers and compat stuff.
Use iterators in bin for the various lookup functions.
Optimized the clock a bit.
Added some more docs.
Fixed scheduling in the ()-(l) case.
Fixed and added some testcases.
Removed old code.
2005-01-19 Wim Taymans <wim@fluendo.com>
* gst/elements/gstfilesrc.c: (gst_filesrc_getrange),

View file

@ -67,7 +67,8 @@ Multi-sink elements
----------
If the chain based sink wants to wait for one of the pads to receive a buffer, just
implement the action to perform in the chain function.
implement the action to perform in the chain function. Be aware that the action could
be performed in different threads and possibly simultaneously so grab the STREAM_LOCK.
Collect pads
------------
@ -176,3 +177,25 @@ Cases
| src--sink src--sink src--sink |
+---------+ +----------+ +------------+ +----------+
(l-g) (c-l) (g) (l) () (c)
* fakesink has a chain function and the peer pad has no
loop function, no scheduling is done.
* avidemuxer and identity expose an (g) - (l) connection,
a thread is created to call the sinkpad loop function.
* identity knows the srcpad is getrange based and uses the
thread from avidemux to getrange data from filesrc.
+---------+ +----------+ +------------+ +----------+
| filesrc | | identity | | oggdemuxer | | fakesink |
| src--sink src--sink src--sink |
+---------+ +----------+ +------------+ +----------+
(l-g) (c) () (l-c) () (c)
* fakesink has a chain function and the peer pad has no
loop function, no scheduling is done.
* oggdemuxer and identity expose an () - (l-c) connection,
oggdemux has to operate in chain mode.
* identity chan only work chain based and so filesrc creates
a thread to push data to identity.

View file

@ -861,48 +861,6 @@ done:
GST_STREAM_UNLOCK (pad);
}
#if 0
/**
* gst_fakesrc_loop:
* @element: the faksesrc to loop
*
* generate an empty buffer and push it to the next element.
*/
static gboolean
gst_fakesrc_loop (GstPad * pad)
{
GstFakeSrc *src;
const GList *pads;
GstTask *task;
src = GST_FAKESRC (GST_PAD_PARENT (pad));
task = src->task;
pads = GST_ELEMENT (src)->pads;
while (pads) {
GstPad *pad = GST_PAD (pads->data);
GstBuffer *buffer;
GstFlowReturn ret;
ret = gst_fakesrc_get (pad, &buffer);
if (ret != GST_FLOW_OK) {
return FALSE;
}
ret = gst_pad_push (pad, buffer);
if (ret != GST_FLOW_OK) {
return FALSE;
}
if (src->eos) {
return FALSE;
}
pads = g_list_next (pads);
}
return TRUE;
}
#endif
static gboolean
gst_fakesrc_activate (GstPad * pad, GstActivateMode mode)
{

View file

@ -259,6 +259,7 @@ gst_identity_chain (GstPad * pad, GstBuffer * buffer)
{
GstBuffer *buf = GST_BUFFER (buffer);
GstIdentity *identity;
GstFlowReturn result = GST_FLOW_OK;
guint i;
g_return_val_if_fail (pad != NULL, GST_FLOW_ERROR);
@ -365,12 +366,12 @@ gst_identity_chain (GstPad * pad, GstBuffer * buffer)
}
identity->bytes_handled += GST_BUFFER_SIZE (buf);
gst_pad_push (identity->srcpad, buf);
result = gst_pad_push (identity->srcpad, buf);
if (identity->sleep_time)
g_usleep (identity->sleep_time);
}
return GST_FLOW_OK;
return result;
}
#if 0

View file

@ -813,33 +813,6 @@ init_popt_callback (poptContext context, enum poptCallbackReason reason,
}
}
/**
* gst_use_threads:
* @use_threads: a #gboolean indicating whether threads should be used
*
* This function is deprecated and should not be used in new code.
* GStreamer requires threads to be enabled at all times.
*/
void
gst_use_threads (gboolean use_threads)
{
}
/**
* gst_has_threads:
*
* This function is deprecated and should not be used in new code.
*
* Queries if GStreamer has threads enabled.
*
* Returns: %TRUE if threads are enabled.
*/
gboolean
gst_has_threads (void)
{
return TRUE;
}
/**
* gst_version:
* @major: pointer to a guint to store the major version number

View file

@ -97,12 +97,6 @@ gboolean gst_init_check_with_popt_table (int *argc, char **argv[],
const GstPoptOption * gst_init_get_popt_table (void);
void gst_use_threads (gboolean use_threads);
gboolean gst_has_threads (void);
void gst_main (void);
void gst_main_quit (void);
G_END_DECLS
#endif /* __GST_H__ */

View file

@ -521,6 +521,13 @@ no_function:
return FALSE;
}
static GstIteratorItem
iterate_child (GstIterator * it, GstElement * child)
{
gst_object_ref (GST_OBJECT (child));
return GST_ITERATOR_ITEM_PASS;
}
/**
* gst_bin_iterate_elements:
* @bin: #Gstbin to iterate the elements of
@ -542,20 +549,76 @@ gst_bin_iterate_elements (GstBin * bin)
g_return_val_if_fail (GST_IS_BIN (bin), NULL);
GST_LOCK (bin);
/* add ref because the iterator refs the bin */
/* add ref because the iterator refs the bin. When the iterator
* is freed it will unref the bin again using the provided dispose
* function. */
gst_object_ref (GST_OBJECT (bin));
result = gst_iterator_new_list (GST_GET_LOCK (bin),
&bin->children_cookie,
&bin->children,
bin,
(GstIteratorRefFunction) gst_object_ref,
(GstIteratorUnrefFunction) gst_object_unref,
(GstIteratorItemFunction) iterate_child,
(GstIteratorDisposeFunction) gst_object_unref);
GST_UNLOCK (bin);
return result;
}
static GstIteratorItem
iterate_child_recurse (GstIterator * it, GstElement * child)
{
gst_object_ref (GST_OBJECT (child));
if (GST_IS_BIN (child)) {
GstIterator *other = gst_bin_iterate_recurse (GST_BIN (child));
gst_iterator_push (it, other);
}
return GST_ITERATOR_ITEM_PASS;
}
/**
* gst_bin_iterate_recurse:
* @bin: #Gstbin to iterate the elements of
*
* Get an iterator for the elements in this bin.
* Each element will have its refcount increased, so unref
* after usage. This iterator recurses into GstBin children.
*
* Returns: a #GstIterator of #GstElements. gst_iterator_free after
* use. returns NULL when passing bad parameters.
*
* MT safe.
*/
GstIterator *
gst_bin_iterate_recurse (GstBin * bin)
{
GstIterator *result;
g_return_val_if_fail (GST_IS_BIN (bin), NULL);
GST_LOCK (bin);
/* add ref because the iterator refs the bin. When the iterator
* is freed it will unref the bin again using the provided dispose
* function. */
gst_object_ref (GST_OBJECT (bin));
result = gst_iterator_new_list (GST_GET_LOCK (bin),
&bin->children_cookie,
&bin->children,
bin,
(GstIteratorItemFunction) iterate_child_recurse,
(GstIteratorDisposeFunction) gst_object_unref);
GST_UNLOCK (bin);
return result;
return NULL;
}
GstIterator *
gst_bin_iterate_recurse_up (GstBin * bin)
{
return NULL;
}
/* returns 0 if the element is a sink, this is made so that
* we can use this function as a filter
*
@ -937,7 +1000,6 @@ gst_bin_dispose (GObject * object)
/* ref to not hit 0 again */
gst_object_ref (GST_OBJECT (object));
gst_element_set_state (GST_ELEMENT (object), GST_STATE_NULL);
while (bin->children) {
gst_bin_remove (bin, GST_ELEMENT (bin->children->data));
@ -949,6 +1011,21 @@ gst_bin_dispose (GObject * object)
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static gint
compare_name (GstElement * element, const gchar * name)
{
gint eq;
GST_LOCK (element);
eq = strcmp (GST_ELEMENT_NAME (element), name) == 0;
GST_UNLOCK (element);
if (eq != 0) {
gst_object_unref (GST_OBJECT (element));
}
return eq;
}
/**
* gst_bin_get_by_name:
* @bin: #Gstbin to search
@ -958,46 +1035,24 @@ gst_bin_dispose (GObject * object)
* function recurses into subbins.
*
* Returns: the element with the given name. Returns NULL if the
* element is not found or when bad parameters were given.
* element is not found or when bad parameters were given. Unref after
* usage.
*
* MT safe.
*/
GstElement *
gst_bin_get_by_name (GstBin * bin, const gchar * name)
{
GList *children;
GstElement *result = NULL;
GstIterator *children;
GstIterator *result;
g_return_val_if_fail (GST_IS_BIN (bin), NULL);
g_return_val_if_fail (name != NULL, NULL);
GST_CAT_INFO (GST_CAT_PARENTAGE, "[%s]: looking up child element %s",
GST_ELEMENT_NAME (bin), name);
children = gst_bin_iterate_recurse (bin);
result = gst_iterator_find_custom (children, (gpointer) name,
(GCompareFunc) compare_name);
/* we recursively lock all elements, this might be a bit too much.. */
GST_LOCK (bin);
for (children = bin->children; children; children = g_list_next (children)) {
GstElement *child = GST_ELEMENT_CAST (children->data);
gboolean eq;
GST_LOCK (child);
eq = strcmp (GST_ELEMENT_NAME (child), name) == 0;
GST_UNLOCK (child);
if (eq) {
result = child;
break;
}
if (GST_IS_BIN (child)) {
result = gst_bin_get_by_name (GST_BIN (child), name);
if (result) {
break;
}
}
}
GST_UNLOCK (bin);
return result;
return GST_ELEMENT_CAST (result);
}
/**
@ -1009,7 +1064,7 @@ gst_bin_get_by_name (GstBin * bin, const gchar * name)
* element is not found, a recursion is performed on the parent bin.
*
* Returns: the element with the given name or NULL when the element
* was not found or bad parameters were given.
* was not found or bad parameters were given. Unref after usage.
*
* MT safe.
*/
@ -1037,6 +1092,23 @@ gst_bin_get_by_name_recurse_up (GstBin * bin, const gchar * name)
return result;
}
static gint
compare_interface (GstElement * element, gpointer interface)
{
gint ret;
if (G_TYPE_CHECK_INSTANCE_TYPE (element, GPOINTER_TO_INT (interface))) {
ret = 0;
} else {
/* we did not find the element, need to release the ref
* added by the iterator */
gst_object_unref (GST_OBJECT (element));
ret = 1;
}
return ret;
}
/**
* gst_bin_get_by_interface:
* @bin: bin to find element in
@ -1046,36 +1118,26 @@ gst_bin_get_by_name_recurse_up (GstBin * bin, const gchar * name)
* interface. If such an element is found, it returns the element. You can
* cast this element to the given interface afterwards.
* If you want all elements that implement the interface, use
* gst_bin_get_all_by_interface(). The function recurses bins inside bins.
* gst_bin_iterate_all_by_interface(). The function recurses bins inside bins.
*
* Returns: An element inside the bin implementing the interface.
* Returns: An element inside the bin implementing the interface. Unref after
* usage.
*
* MT safe.
*/
GstElement *
gst_bin_get_by_interface (GstBin * bin, GType interface)
{
GList *walk;
GstElement *result = NULL;
GstIterator *children;
GstIterator *result;
g_return_val_if_fail (GST_IS_BIN (bin), NULL);
g_return_val_if_fail (G_TYPE_IS_INTERFACE (interface), NULL);
GST_LOCK (bin);
for (walk = bin->children; walk; walk = g_list_next (walk)) {
if (G_TYPE_CHECK_INSTANCE_TYPE (walk->data, interface)) {
result = GST_ELEMENT_CAST (walk->data);
break;
}
if (GST_IS_BIN (walk->data)) {
result = gst_bin_get_by_interface (GST_BIN (walk->data), interface);
if (result)
break;
}
}
GST_UNLOCK (bin);
children = gst_bin_iterate_recurse (bin);
result = gst_iterator_find_custom (children, GINT_TO_POINTER (interface),
(GCompareFunc) compare_interface);
return result;
return GST_ELEMENT_CAST (result);
}
/**
@ -1085,35 +1147,25 @@ gst_bin_get_by_interface (GstBin * bin, GType interface)
*
* Looks for all elements inside the bin that implements the given
* interface. You can safely cast all returned elements to the given interface.
* The function recurses bins inside bins. You need to free the list using
* g_list_free() after use.
* The function recurses bins inside bins. The iterator will return a series
* of #GstElement that should be unreffed after usage.
*
* Returns: The elements inside the bin implementing the interface.
* Returns: An iterator for the elements inside the bin implementing the interface.
*
* MT safe.
*/
GList *
gst_bin_get_all_by_interface (GstBin * bin, GType interface)
GstIterator *
gst_bin_iterate_all_by_interface (GstBin * bin, GType interface)
{
GList *walk;
GList *ret = NULL;
GstIterator *children;
GstIterator *result;
g_return_val_if_fail (GST_IS_BIN (bin), NULL);
g_return_val_if_fail (G_TYPE_IS_INTERFACE (interface), NULL);
GST_LOCK (bin);
for (walk = bin->children; walk; walk = g_list_next (walk)) {
if (G_TYPE_CHECK_INSTANCE_TYPE (walk->data, interface)) {
ret = g_list_prepend (ret, walk->data);
}
if (GST_IS_BIN (walk->data)) {
ret = g_list_concat (ret,
gst_bin_get_all_by_interface (GST_BIN (walk->data), interface));
}
}
GST_UNLOCK (bin);
children = gst_bin_iterate_recurse (bin);
result = gst_iterator_filter (children, GINT_TO_POINTER (interface),
(GCompareFunc) compare_interface);
return ret;
return result;
}
#ifndef GST_DISABLE_LOADSAVE

View file

@ -98,6 +98,9 @@ GstElement* gst_bin_get_by_interface (GstBin *bin, GType interface);
/* retrieve multiple children */
GstIterator* gst_bin_iterate_elements (GstBin *bin);
GstIterator* gst_bin_iterate_recurse (GstBin *bin);
GstIterator* gst_bin_iterate_recurse_up (GstBin *bin);
GstIterator* gst_bin_iterate_sinks (GstBin *bin);
GstIterator* gst_bin_iterate_all_by_interface (GstBin *bin, GType interface);

View file

@ -195,6 +195,8 @@ gst_bus_post (GstBus * bus, GstMessage * message)
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
GST_DEBUG_OBJECT (bus, "posting message on bus");
GST_LOCK (bus);
handler = bus->sync_handler;
handler_data = bus->sync_handler_data;

View file

@ -508,6 +508,34 @@ gst_clock_get_resolution (GstClock * clock)
return G_GINT64_CONSTANT (1);
}
/**
* gst_clock_adjust_unlocked
* @clock: a #GstClock to use
* @internal: a clock time
*
* Converts the given @internal clock time to the real time, adjusting
* and making sure that the returned time is increasing.
* This function should be called with the clock lock held.
*
* Returns: the converted time of the clock.
*
* MT safe.
*/
GstClockTime
gst_clock_adjust_unlocked (GstClock * clock, GstClockTime internal)
{
GstClockTime ret;
ret = internal + clock->adjust;
/* make sure the time is increasing, else return last_time */
if ((gint64) ret < (gint64) clock->last_time) {
ret = clock->last_time;
} else {
clock->last_time = ret;
}
return ret;
}
/**
* gst_clock_get_time
* @clock: a #GstClock to query
@ -535,17 +563,16 @@ gst_clock_get_time (GstClock * clock)
} else {
ret = G_GINT64_CONSTANT (0);
}
GST_CAT_DEBUG (GST_CAT_CLOCK, "internal time %" GST_TIME_FORMAT,
GST_TIME_ARGS (ret));
/* make sure the time is increasing, else return last_time */
GST_LOCK (clock);
ret += clock->adjust;
if ((gint64) ret < (gint64) clock->last_time) {
ret = clock->last_time;
} else {
clock->last_time = ret;
}
ret = gst_clock_adjust_unlocked (clock, ret);
GST_UNLOCK (clock);
GST_CAT_DEBUG (GST_CAT_CLOCK, "adjusted time %" GST_TIME_FORMAT,
GST_TIME_ARGS (ret));
return ret;
}

View file

@ -179,6 +179,8 @@ guint64 gst_clock_get_resolution (GstClock *clock);
GstClockTime gst_clock_get_time (GstClock *clock);
void gst_clock_set_time_adjust (GstClock *clock, GstClockTime adjust);
GstClockTime gst_clock_adjust_unlocked (GstClock *clock, GstClockTime internal);
/* creating IDs that can be used to get notifications */
GstClockID gst_clock_new_single_shot_id (GstClock *clock,

View file

@ -1,6 +1,6 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000 Wim Taymans <wtay@chello.be>
* 2004 Wim Taymans <wim@fluendo.com>
*
* gstcompat.h: backwards compatibility stuff
*
@ -28,57 +28,6 @@
G_BEGIN_DECLS
#ifndef GST_DISABLE_DEPRECATED
/* 0.5.2 changes; remove these ASAP */
/* element functions */
#define gst_element_connect(a,b) gst_element_link(a,b)
#define gst_element_connect_pads(a,b,c,d) \
gst_element_link_pads(a,b,c,d)
#ifdef G_HAVE_ISO_VARARGS
#define gst_element_connect_many(a,...) gst_element_link_many(a,__VA_ARGS__)
#elif defined(G_HAVE_GNUC_VARARGS)
#define gst_element_connect_many(a,args...) \
gst_element_link_many(a, ## args)
#else
/* FIXME: need an inline function */
#endif
#define gst_element_connect_filtered(a,b,c) \
gst_element_link_filtered(a,b,c)
#define gst_element_disconnect(a,b) gst_element_unlink(a,b)
/* pad functions */
#define gst_pad_connect(a,b) gst_pad_link(a,b)
#define gst_pad_connect_filtered(a,b,c) gst_pad_link_filtered(a,b,c)
#define gst_pad_disconnect(a,b) gst_pad_unlink(a,b)
#define gst_pad_proxy_connect(a,b) gst_pad_proxy_link(a,b)
#define gst_pad_set_connect_function(a,b) \
gst_pad_set_link_function(a,b)
/* pad macros */
#define GST_PAD_IS_CONNECTED(a) GST_PAD_IS_LINKED(a)
/* pad enums */
#define GST_PAD_CONNECT_REFUSED GST_PAD_LINK_REFUSED
#define GST_PAD_CONNECT_DELAYED GST_PAD_LINK_DELAYED
#define GST_PAD_CONNECT_OK GST_PAD_LINK_OK
#define GST_PAD_CONNECT_DONE GST_PAD_LINK_DONE
typedef GstPadLinkReturn GstPadConnectReturn;
/* pad function types */
typedef GstPadLinkFunction GstPadConnectFunction;
/* probably not used */
/*
* GST_RPAD_LINKFUNC
*/
/* 0.8.1.1 removal; remove completely in 0.9 */
/* information messages */
# ifdef G_HAVE_ISO_VARARGS
#define gst_info(...) GST_INFO(__VA_ARGS__)
# elif defined(G_HAVE_GNUC_VARARGS)
#define gst_info(format,args...) GST_INFO(format,##args)
# endif
#endif /* not GST_DISABLE_DEPRECATED */

View file

@ -830,6 +830,13 @@ gst_element_get_pad (GstElement * element, const gchar * name)
return pad;
}
GstIteratorItem
iterate_pad (GstIterator * it, GstPad * pad)
{
gst_object_ref (GST_OBJECT_CAST (pad));
return GST_ITERATOR_ITEM_PASS;
}
/**
* gst_element_iterate_pads:
* @element: a #GstElement to iterate pads of.
@ -853,8 +860,7 @@ gst_element_iterate_pads (GstElement * element)
&element->pads_cookie,
&element->pads,
element,
(GstIteratorRefFunction) gst_object_ref,
(GstIteratorUnrefFunction) gst_object_unref,
(GstIteratorItemFunction) iterate_pad,
(GstIteratorDisposeFunction) gst_object_unref);
GST_UNLOCK (element);
@ -1571,6 +1577,8 @@ gst_element_get_state_func (GstElement * element,
g_return_val_if_fail (GST_IS_ELEMENT (element), FALSE);
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "getting state");
GST_STATE_LOCK (element);
/* we got an error, report immediatly */
if (GST_STATE_ERROR (element))
@ -1578,21 +1586,26 @@ gst_element_get_state_func (GstElement * element,
old_pending = GST_STATE_PENDING (element);
if (old_pending != GST_STATE_VOID_PENDING) {
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "wait for pending");
/* we have a pending state change, wait for it to complete */
if (!GST_STATE_TIMED_WAIT (element, timeout)) {
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "timeout");
/* timeout triggered */
ret = GST_STATE_ASYNC;
} else {
/* could be success or failure */
if (old_pending == GST_STATE (element)) {
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "got success");
ret = GST_STATE_SUCCESS;
} else {
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "got failure");
ret = GST_STATE_FAILURE;
}
}
}
/* if nothing is pending anymore we can return SUCCESS */
if (GST_STATE_PENDING (element) == GST_STATE_VOID_PENDING) {
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "nothing pending");
ret = GST_STATE_SUCCESS;
}
@ -1602,6 +1615,11 @@ done:
if (pending)
*pending = GST_STATE_PENDING (element);
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
"state current: %s, pending: %s",
gst_element_state_get_name (GST_STATE (element)),
gst_element_state_get_name (GST_STATE_PENDING (element)));
GST_STATE_UNLOCK (element);
return ret;
@ -1877,12 +1895,13 @@ restart:
/* we only care about real pads */
if (GST_IS_REAL_PAD (pad)) {
GstRealPad *peer;
gboolean pad_loop;
gboolean pad_loop, pad_get;
gboolean delay = FALSE;
/* see if the pad has a loop function and grab
* the peer */
GST_LOCK (pad);
pad_get = GST_RPAD_GETRANGEFUNC (pad) != NULL;
pad_loop = GST_RPAD_LOOPFUNC (pad) != NULL;
peer = GST_RPAD_PEER (pad);
if (peer)
@ -1890,18 +1909,20 @@ restart:
GST_UNLOCK (pad);
if (peer) {
gboolean peer_loop;
gboolean peer_loop, peer_get;
/* see if the peer has a getrange function */
peer_get = GST_RPAD_GETRANGEFUNC (peer) != NULL;
/* see if the peer has a loop function */
peer_loop = GST_RPAD_LOOPFUNC (peer) != NULL;
/* sinkpads with a loop function are delayed since they
* need the srcpad to be active first */
if (GST_PAD_IS_SINK (pad) && pad_loop) {
if (GST_PAD_IS_SINK (pad) && pad_loop && peer_get) {
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
"delaying pad %s", GST_OBJECT_NAME (pad));
delay = TRUE;
} else if (GST_PAD_IS_SRC (pad) && peer_loop) {
} else if (GST_PAD_IS_SRC (pad) && peer_loop && pad_get) {
/* If the pad is a source and the peer has a loop function,
* we can activate the srcpad and then the loopbased sinkpad */
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
@ -2039,7 +2060,6 @@ gst_element_dispose (GObject * object)
/* ref so we don't hit 0 again */
gst_object_ref (GST_OBJECT (object));
gst_element_set_state (element, GST_STATE_NULL);
/* first we break all our links with the ouside */
while (element->pads) {

View file

@ -194,7 +194,7 @@ gst_format_iterate_definitions (void)
g_static_mutex_lock (&mutex);
result = gst_iterator_new_list (g_static_mutex_get_mutex (&mutex),
&_n_values, &_gst_formats, NULL, NULL, NULL, NULL);
&_n_values, &_gst_formats, NULL, NULL, NULL);
g_static_mutex_unlock (&mutex);
return result;

View file

@ -27,14 +27,17 @@ gst_iterator_init (GstIterator * it,
GMutex * lock,
guint32 * master_cookie,
GstIteratorNextFunction next,
GstIteratorItemFunction item,
GstIteratorResyncFunction resync, GstIteratorFreeFunction free)
{
it->lock = lock;
it->master_cookie = master_cookie;
it->cookie = *master_cookie;
it->next = next;
it->item = item;
it->resync = resync;
it->free = free;
it->pushed = NULL;
}
/**
@ -43,6 +46,7 @@ gst_iterator_init (GstIterator * it,
* @lock: pointer to a #GMutex.
* @master_cookie: pointer to a guint32 to protect the iterated object.
* @next: function to get next item
* @item: function to call on each item retrieved
* @resync: function to resync the iterator
* @free: function to free the iterator
*
@ -58,6 +62,7 @@ gst_iterator_new (guint size,
GMutex * lock,
guint32 * master_cookie,
GstIteratorNextFunction next,
GstIteratorItemFunction item,
GstIteratorResyncFunction resync, GstIteratorFreeFunction free)
{
GstIterator *result;
@ -69,7 +74,7 @@ gst_iterator_new (guint size,
g_return_val_if_fail (free != NULL, NULL);
result = g_malloc (size);
gst_iterator_init (result, lock, master_cookie, next, resync, free);
gst_iterator_init (result, lock, master_cookie, next, item, resync, free);
return result;
}
@ -83,8 +88,6 @@ typedef struct _GstListIterator
gpointer owner;
GList **orig;
GList *list; /* pointer in list */
GstIteratorRefFunction reffunc;
GstIteratorUnrefFunction unreffunc;
GstIteratorDisposeFunction freefunc;
} GstListIterator;
@ -95,9 +98,6 @@ gst_list_iterator_next (GstListIterator * it, gpointer * elem)
return GST_ITERATOR_DONE;
*elem = it->list->data;
if (it->reffunc) {
it->reffunc (*elem);
}
it->list = g_list_next (it->list);
return GST_ITERATOR_OK;
@ -139,8 +139,7 @@ gst_iterator_new_list (GMutex * lock,
guint32 * master_cookie,
GList ** list,
gpointer owner,
GstIteratorRefFunction ref,
GstIteratorUnrefFunction unref, GstIteratorDisposeFunction free)
GstIteratorItemFunction item, GstIteratorDisposeFunction free)
{
GstListIterator *result;
@ -149,19 +148,27 @@ gst_iterator_new_list (GMutex * lock,
lock,
master_cookie,
(GstIteratorNextFunction) gst_list_iterator_next,
(GstIteratorItemFunction) item,
(GstIteratorResyncFunction) gst_list_iterator_resync,
(GstIteratorFreeFunction) gst_list_iterator_free);
result->owner = owner;
result->orig = list;
result->list = *list;
result->reffunc = ref;
result->unreffunc = unref;
result->freefunc = free;
return GST_ITERATOR (result);
}
static void
gst_iterator_pop (GstIterator * it)
{
if (it->pushed) {
gst_iterator_free (it->pushed);
it->pushed = NULL;
}
}
/**
* gst_iterator_next:
* @it: The #GstIterator to iterate
@ -181,14 +188,43 @@ gst_iterator_next (GstIterator * it, gpointer * elem)
g_return_val_if_fail (it != NULL, GST_ITERATOR_ERROR);
g_return_val_if_fail (elem != NULL, GST_ITERATOR_ERROR);
restart:
if (it->pushed) {
result = gst_iterator_next (it->pushed, elem);
if (result == GST_ITERATOR_DONE) {
/* we are done with this iterator, pop it and
* fallthrough iterating the main iterator again. */
gst_iterator_pop (it);
} else {
return result;
}
}
if (G_LIKELY (it->lock))
g_mutex_lock (it->lock);
if (G_UNLIKELY (*it->master_cookie != it->cookie)) {
result = GST_ITERATOR_RESYNC;
goto done;
}
result = it->next (it, elem);
if (it->item) {
GstIteratorItem itemres;
itemres = it->item (it, *elem);
switch (itemres) {
case GST_ITERATOR_ITEM_SKIP:
if (G_LIKELY (it->lock))
g_mutex_unlock (it->lock);
goto restart;
case GST_ITERATOR_ITEM_END:
result = GST_ITERATOR_DONE;
break;
case GST_ITERATOR_ITEM_PASS:
break;
}
}
done:
if (G_LIKELY (it->lock))
@ -211,6 +247,8 @@ gst_iterator_resync (GstIterator * it)
{
g_return_if_fail (it != NULL);
gst_iterator_pop (it);
if (G_LIKELY (it->lock))
g_mutex_lock (it->lock);
it->resync (it);
@ -232,9 +270,34 @@ gst_iterator_free (GstIterator * it)
{
g_return_if_fail (it != NULL);
gst_iterator_pop (it);
it->free (it);
}
/**
* gst_iterator_push:
* @it: The #GstIterator to use
* @other: The #GstIterator to push
*
* Pushes @other iterator onto @it. All calls performed on @it are
* forwarded tot @other. If @other returns #GST_ITERATOR_DONE, it is
* popped again and calls are handled by @it again.
*
* This function is mainly used by objects implementing the iterator
* next function to recurse into substructures.
*
* MT safe.
*/
void
gst_iterator_push (GstIterator * it, GstIterator * other)
{
g_return_if_fail (it != NULL);
g_return_if_fail (other != NULL);
it->pushed = other;
}
typedef struct _GstIteratorFilter
{
GstIterator iterator;
@ -345,6 +408,7 @@ gst_iterator_filter (GstIterator * it, gpointer user_data, GCompareFunc func)
result = (GstIteratorFilter *) gst_iterator_new (sizeof (GstIteratorFilter),
it->lock, it->master_cookie,
(GstIteratorNextFunction) filter_next,
(GstIteratorItemFunction) NULL,
(GstIteratorResyncFunction) filter_resync,
(GstIteratorFreeFunction) filter_free);
it->lock = NULL;
@ -381,6 +445,7 @@ gst_iterator_foreach (GstIterator * it, GFunc function, gpointer user_data)
gst_iterator_init (GST_ITERATOR (&filter),
it->lock, it->master_cookie,
(GstIteratorNextFunction) filter_next,
(GstIteratorItemFunction) NULL,
(GstIteratorResyncFunction) filter_resync,
(GstIteratorFreeFunction) filter_uninit);
it->lock = NULL;
@ -421,6 +486,7 @@ gst_iterator_find_custom (GstIterator * it, gpointer user_data,
gst_iterator_init (GST_ITERATOR (&filter),
it->lock, it->master_cookie,
(GstIteratorNextFunction) filter_next,
(GstIteratorItemFunction) NULL,
(GstIteratorResyncFunction) filter_resync,
(GstIteratorFreeFunction) filter_uninit);
it->lock = NULL;

View file

@ -35,11 +35,16 @@ typedef enum {
typedef struct _GstIterator GstIterator;
typedef void (*GstIteratorRefFunction) (gpointer item);
typedef void (*GstIteratorUnrefFunction) (gpointer item);
typedef enum {
GST_ITERATOR_ITEM_SKIP = 0, /* skip item */
GST_ITERATOR_ITEM_PASS = 1, /* return item */
GST_ITERATOR_ITEM_END = 2, /* stop after this item */
} GstIteratorItem;
typedef void (*GstIteratorDisposeFunction) (gpointer owner);
typedef GstIteratorResult (*GstIteratorNextFunction) (GstIterator *it, gpointer *result);
typedef GstIteratorItem (*GstIteratorItemFunction) (GstIterator *it, gpointer item);
typedef void (*GstIteratorResyncFunction) (GstIterator *it);
typedef void (*GstIteratorFreeFunction) (GstIterator *it);
@ -50,9 +55,12 @@ typedef void (*GstIteratorFreeFunction) (GstIterator *it);
struct _GstIterator {
GstIteratorNextFunction next;
GstIteratorItemFunction item;
GstIteratorResyncFunction resync;
GstIteratorFreeFunction free;
GstIterator *pushed; /* pushed iterator */
GMutex *lock;
guint32 cookie; /* cookie of the iterator */
guint32 *master_cookie; /* pointer to guint32 holding the cookie when this
@ -64,6 +72,7 @@ GstIterator* gst_iterator_new (guint size,
GMutex *lock,
guint32 *master_cookie,
GstIteratorNextFunction next,
GstIteratorItemFunction item,
GstIteratorResyncFunction resync,
GstIteratorFreeFunction free);
@ -71,8 +80,7 @@ GstIterator* gst_iterator_new_list (GMutex *lock,
guint32 *master_cookie,
GList **list,
gpointer owner,
GstIteratorRefFunction ref,
GstIteratorUnrefFunction unref,
GstIteratorItemFunction item,
GstIteratorDisposeFunction free);
/* using iterators */
@ -80,6 +88,8 @@ GstIteratorResult gst_iterator_next (GstIterator *it, gpointer *result);
void gst_iterator_resync (GstIterator *it);
void gst_iterator_free (GstIterator *it);
void gst_iterator_push (GstIterator *it, GstIterator *other);
/* special functions that operate on iterators */
void gst_iterator_foreach (GstIterator *it, GFunc function,
gpointer user_data);

View file

@ -420,7 +420,6 @@ gst_object_dispose (GObject * object)
PATCH_REFCOUNT1 (object)
parent_class->dispose (object);
}
/* finalize is called when the object has to free its resources */

View file

@ -464,6 +464,10 @@ gst_pad_set_active (GstPad * pad, GstActivateMode mode)
if (activatefunc) {
gboolean result;
GST_CAT_DEBUG (GST_CAT_PADS,
"calling activate function on pad %s:%s with mode %d",
GST_DEBUG_PAD_NAME (realpad), mode);
/* unlock so element can sync */
GST_UNLOCK (realpad);
result = activatefunc (GST_PAD_CAST (realpad), mode);
@ -519,7 +523,7 @@ gst_pad_is_active (GstPad * pad)
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
result = !!GST_FLAG_IS_SET (realpad, GST_PAD_ACTIVE);
result = GST_FLAG_IS_SET (realpad, GST_PAD_ACTIVE);
GST_UNLOCK (realpad);
return result;

View file

@ -216,11 +216,9 @@ pipeline_bus_handler (GstBus * bus, GstMessage * message,
{
GstBusSyncReply result = GST_BUS_PASS;
gboolean posteos = FALSE;
gboolean locked;
/* we don't want messages from the streaming thread while we're doing the
* state change. We do want them from the state change functions. */
locked = GST_STATE_TRYLOCK (pipeline);
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:
@ -242,8 +240,6 @@ pipeline_bus_handler (GstBus * bus, GstMessage * message,
default:
break;
}
if (locked)
GST_STATE_UNLOCK (pipeline);
if (posteos) {
gst_bus_post (bus, gst_message_new_eos (GST_OBJECT (pipeline)));

View file

@ -192,7 +192,7 @@ gst_query_type_iterate_definitions (void)
g_static_mutex_lock (&mutex);
result = gst_iterator_new_list (g_static_mutex_get_mutex (&mutex),
&_n_values, &_gst_queries, NULL, NULL, NULL, NULL);
&_n_values, &_gst_queries, NULL, NULL, NULL);
g_static_mutex_unlock (&mutex);
return result;

View file

@ -125,48 +125,6 @@ gst_scheduler_reset (GstScheduler * sched)
sclass->reset (sched);
}
/**
* gst_scheduler_add_element:
* @sched: the scheduler
* @element: the element to add to the scheduler
*
* Add an element to the scheduler.
*/
void
gst_scheduler_add_element (GstScheduler * sched, GstElement * element)
{
GstSchedulerClass *sclass;
g_return_if_fail (GST_IS_SCHEDULER (sched));
g_return_if_fail (GST_IS_ELEMENT (element));
sclass = GST_SCHEDULER_GET_CLASS (sched);
if (sclass->add_element)
sclass->add_element (sched, element);
}
/**
* gst_scheduler_remove_element:
* @sched: the scheduler
* @element: the element to remove
*
* Remove an element from the scheduler.
*/
void
gst_scheduler_remove_element (GstScheduler * sched, GstElement * element)
{
GstSchedulerClass *sclass;
g_return_if_fail (GST_IS_SCHEDULER (sched));
g_return_if_fail (GST_IS_ELEMENT (element));
sclass = GST_SCHEDULER_GET_CLASS (sched);
if (sclass->remove_element)
sclass->remove_element (sched, element);
}
GstTask *
gst_scheduler_create_task (GstScheduler * sched, GstTaskFunction func,
gpointer data)
@ -184,56 +142,6 @@ gst_scheduler_create_task (GstScheduler * sched, GstTaskFunction func,
return result;
}
GstClockReturn gst_clock_id_wait (GstClockID id, GstClockTimeDiff * jitter);
/**
* gst_scheduler_clock_wait:
* @sched: the scheduler
* @element: the element that wants to wait
* @id: the clockid to use
* @jitter: the time difference between requested time and actual time
*
* Wait till the clock reaches a specific time. The ClockID can
* be obtained from #gst_clock_new_single_shot_id.
*
* Returns: the status of the operation
*/
GstClockReturn
gst_scheduler_clock_wait (GstScheduler * sched, GstElement * element,
GstClockID id, GstClockTimeDiff * jitter)
{
GstSchedulerClass *sclass;
g_return_val_if_fail (GST_IS_SCHEDULER (sched), GST_CLOCK_ERROR);
g_return_val_if_fail (id != NULL, GST_CLOCK_ERROR);
sclass = GST_SCHEDULER_GET_CLASS (sched);
if (sclass->clock_wait)
return sclass->clock_wait (sched, element, id, jitter);
else
return gst_clock_id_wait (id, jitter);
}
/**
* gst_scheduler_show:
* @sched: the scheduler
*
* Dump the state of the scheduler
*/
void
gst_scheduler_show (GstScheduler * sched)
{
GstSchedulerClass *sclass;
g_return_if_fail (GST_IS_SCHEDULER (sched));
sclass = GST_SCHEDULER_GET_CLASS (sched);
if (sclass->show)
sclass->show (sched);
}
/*
* Factory stuff starts here
*

View file

@ -63,20 +63,9 @@ struct _GstSchedulerClass {
/* virtual methods */
void (*setup) (GstScheduler *sched);
void (*reset) (GstScheduler *sched);
void (*add_element) (GstScheduler *sched, GstElement * element);
void (*remove_element) (GstScheduler *sched, GstElement * element);
GstTask* (*create_task) (GstScheduler *sched, GstTaskFunction func, gpointer data);
GstClockReturn (*clock_wait) (GstScheduler *sched, GstElement *element,
GstClockID id, GstClockTimeDiff *jitter);
/* for debugging */
void (*show) (GstScheduler *sched);
/* signals */
void (*object_sync) (GstScheduler *sched, GstClock *clock, GstObject *object,
GstClockID id);
gpointer _gst_reserved[GST_PADDING];
};
@ -88,11 +77,6 @@ void gst_scheduler_reset (GstScheduler *sched);
GstTask* gst_scheduler_create_task (GstScheduler *sched, GstTaskFunction func, gpointer data);
GstClockReturn gst_scheduler_clock_wait (GstScheduler *sched, GstElement *element,
GstClockID id, GstClockTimeDiff *jitter);
void gst_scheduler_show (GstScheduler *sched);
/*
* creating schedulers
*

View file

@ -36,6 +36,8 @@ static GstClockTime gst_system_clock_get_internal_time (GstClock * clock);
static guint64 gst_system_clock_get_resolution (GstClock * clock);
static GstClockReturn gst_system_clock_id_wait (GstClock * clock,
GstClockEntry * entry);
static GstClockReturn gst_system_clock_id_wait_unlocked
(GstClock * clock, GstClockEntry * entry);
static GstClockReturn gst_system_clock_id_wait_async (GstClock * clock,
GstClockEntry * entry);
static void gst_system_clock_id_unschedule (GstClock * clock,
@ -232,11 +234,9 @@ gst_system_clock_async_thread (GstClock * clock)
goto next_entry;
}
/* now wait for the entry, it's a bit stupid to release the lock
* here but we have to since the next function grabs the lock... */
GST_UNLOCK (clock);
res = gst_system_clock_id_wait (clock, (GstClockID) entry);
GST_LOCK (clock);
/* now wait for the entry, we already hold the lock */
res = gst_system_clock_id_wait_unlocked (clock, (GstClockID) entry);
switch (res) {
case GST_CLOCK_UNSCHEDULED:
/* entry was unscheduled, move to the next */
@ -308,31 +308,32 @@ gst_system_clock_get_resolution (GstClock * clock)
* MT safe.
*/
static GstClockReturn
gst_system_clock_id_wait (GstClock * clock, GstClockEntry * entry)
gst_system_clock_id_wait_unlocked (GstClock * clock, GstClockEntry * entry)
{
GstClockTime current, target;
gint64 diff;
GstClockTime real, current, target;
GstClockTimeDiff diff;
current = gst_clock_get_time (clock);
diff = GST_CLOCK_ENTRY_TIME (entry) - current;
real = GST_CLOCK_GET_CLASS (clock)->get_internal_time (clock);
target = GST_CLOCK_ENTRY_TIME (entry);
current = gst_clock_adjust_unlocked (clock, real);
diff = target - current;
target = gst_system_clock_get_internal_time (clock) + diff;
GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p real_target %" GST_TIME_FORMAT
GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p"
" target %" GST_TIME_FORMAT
" now %" GST_TIME_FORMAT
" real %" GST_TIME_FORMAT
" diff %" G_GINT64_FORMAT,
entry,
GST_TIME_ARGS (target),
GST_TIME_ARGS (GST_CLOCK_ENTRY_TIME (entry)),
GST_TIME_ARGS (current), diff);
GST_TIME_ARGS (current), GST_TIME_ARGS (real), diff);
if (diff > 0) {
GTimeVal tv;
GST_TIME_TO_TIMEVAL (target, tv);
GST_LOCK (clock);
while (TRUE) {
/* now wait on the entry, it either times out or the cond is signaled. */
if (!GST_CLOCK_TIMED_WAIT (clock, &tv)) {
@ -349,13 +350,24 @@ gst_system_clock_id_wait (GstClock * clock, GstClockEntry * entry)
break;
}
}
GST_UNLOCK (clock);
} else {
entry->status = GST_CLOCK_EARLY;
}
return entry->status;
}
static GstClockReturn
gst_system_clock_id_wait (GstClock * clock, GstClockEntry * entry)
{
GstClockReturn ret;
GST_LOCK (clock);
ret = gst_system_clock_id_wait_unlocked (clock, entry);
GST_UNLOCK (clock);
return ret;
}
/* Add an entry to the list of pending async waits. The entry is inserted
* in sorted order. If we inserted the entry at the head of the list, we
* need to signal the thread as it might either be waiting on it or waiting

View file

@ -57,7 +57,7 @@ typedef enum {
struct _GstTask {
GstObject object;
/*< public >*/ /* with LOCK */
/*< public >*/ /* with TASK_LOCK */
GstTaskState state;
GCond *cond;

View file

@ -57,7 +57,6 @@ typedef enum
GST_STATE_FAILURE = 0,
GST_STATE_SUCCESS = 1,
GST_STATE_ASYNC = 2,
GST_STATE_BUSY = 3
} GstElementStateReturn;
typedef enum

View file

@ -1,79 +0,0 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000 Wim Taymans <wtay@chello.be>
*
* cothreads_compat.h: Compatibility macros between cothreads packages
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
/* use the old cothreads implementation in gst/cothreads.[ch] */
#if defined(_COTHREADS_OMEGA)
#include "../cothreads.h"
/* the name of this cothreads type */
#define COTHREADS_TYPE omega
#define COTHREADS_NAME "omega"
#define COTHREADS_NAME_CAPITAL "Omega"
/* unify the structs
*
* "cothread" and "cothread_context" need to be defined
*/
typedef struct _cothread_state cothread;
/* define functions
* the macros are prepended with "do_"
*/
#define do_cothreads_init(x) /* NOP */
#define do_cothreads_stackquery(stack,size) cothread_stackquery(stack,size)
#define do_cothread_switch(to) cothread_switch(to)
#define do_cothread_create(new_cothread, context, func, argc, argv) \
G_STMT_START{ \
new_cothread = cothread_create (context); \
if (new_cothread) { \
cothread_setfunc (new_cothread, (func), (argc), (argv)); \
} \
}G_STMT_END
#define do_cothread_setfunc(cothread, context, func, argc, argv) \
cothread_setfunc ((cothread), (func), (argc), (argv))
#define do_cothread_destroy(cothread) cothread_free(cothread)
#define do_cothread_context_init() (cothread_context_init ())
#define do_cothread_context_destroy(context) cothread_context_free (context)
#define do_cothread_get_current(context) (cothread_current())
#define do_cothread_get_main(context) (cothread_current_main())
/* use the gthread-based cothreads implementation */
#elif defined(_COTHREADS_GTHREAD)
#include "gthread-cothreads.h"
/* bail out with an error if no cothreads package is defined */
#else
#error "No cothreads package defined"
#endif

File diff suppressed because it is too large Load diff

View file

@ -1,612 +0,0 @@
/* GStreamer
* Copyright (C) 2004 Martin Soto <martinsoto@users.sourceforge.net>
*
* faircothread.c: High level cothread implementation for the fair scheduler.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <glib.h>
#include <gst/gst.h>
#ifdef _COTHREADS_PTH
#include "pth-cothreads.h"
#else
#include "cothreads_compat.h"
#endif
#include "faircothreads.h"
#if !defined(GST_DISABLE_GST_DEBUG) && defined(FAIRSCHEDULER_USE_GETTID)
#include <sys/types.h>
#include <linux/unistd.h>
_syscall0 (pid_t, gettid)
#endif
GST_DEBUG_CATEGORY_EXTERN (debug_fair_ct);
#define GST_CAT_DEFAULT debug_fair_ct
/*
* Support for Asynchronous Operations
*/
enum
{
ASYNC_OP_CHANGE_STATE = 1,
ASYNC_OP_AWAKE
};
typedef struct _AsyncOp AsyncOp;
typedef struct _AsyncOpChangeState AsyncOpChangeState;
typedef struct _AsyncOpAwake AsyncOpAwake;
struct _AsyncOp
{
int type;
};
struct _AsyncOpChangeState
{
AsyncOp parent;
GstFairSchedulerCothread *ct; /* Cothread whose state will be
changed. */
gint new_state; /* New state for the cothread. */
};
struct _AsyncOpAwake
{
AsyncOp parent;
GstFairSchedulerCothread *ct; /* Cothread to awake. */
gint priority; /* Priority for the cothread. */
};
static gchar *gst_fairscheduler_ct_state_names[] = {
"stopped",
"suspended",
"running"
};
/*
* Helpers
*/
static int
cothread_base_func (int argc, char **argv)
{
GstFairSchedulerCothread *ct;
g_return_val_if_fail (argc >= 1, -1);
ct = (GstFairSchedulerCothread *) argv[0];
GST_INFO ("queue %p: Cothread %p starting", ct->queue, ct);
#ifndef GST_DISABLE_GST_DEBUG
#ifdef FAIRSCHEDULER_USE_GETTID
ct->pid = gettid ();
#else
ct->pid = 0;
#endif
#endif
/* Call the thread function. This looks sort of funny, but there's
no other way I know of doing it. */
switch (argc - 1) {
case 0:
ct->func (ct, NULL);
break;
case 1:
ct->func (ct, argv[1], NULL);
break;
case 2:
ct->func (ct, argv[1], argv[2], NULL);
break;
case 3:
ct->func (ct, argv[1], argv[2], argv[3], NULL);
break;
case 4:
ct->func (ct, argv[1], argv[2], argv[3], argv[4], NULL);
break;
case 5:
ct->func (ct, argv[1], argv[2], argv[3], argv[4], argv[5], NULL);
break;
case 6:
ct->func (ct, argv[1], argv[2], argv[3], argv[4], argv[5], argv[6], NULL);
break;
case 7:
ct->func (ct, argv[1], argv[2], argv[3], argv[4], argv[5],
argv[6], argv[7], NULL);
break;
default:
g_return_val_if_reached (-1);
break;
}
/* After the cothread function is finished, we go to the stopped
state. */
gst_fair_scheduler_cothread_change_state (ct,
GST_FAIRSCHEDULER_CTSTATE_STOPPED);
return 0;
}
static void
cothread_activate (GstFairSchedulerCothread * ct, gint priority)
{
GST_DEBUG ("queue %p: activating cothread %p", ct->queue, ct);
if (priority > 0) {
g_queue_push_head (ct->queue->ct_queue, ct);
} else {
g_queue_push_tail (ct->queue->ct_queue, ct);
}
}
static void
cothread_deactivate (GstFairSchedulerCothread * ct)
{
GList *node;
GST_DEBUG ("queue %p: deactivating cothread %p", ct->queue, ct);
/* Find the node. */
node = g_list_find (ct->queue->ct_queue->head, ct);
if (node == NULL) {
return;
}
if (node->next == NULL) {
g_queue_pop_tail (ct->queue->ct_queue);
} else {
ct->queue->ct_queue->head =
g_list_remove_link (ct->queue->ct_queue->head, node);
}
}
static void
queue_async_op (GstFairSchedulerCothreadQueue * queue, AsyncOp * op)
{
g_mutex_lock (queue->async_mutex);
g_queue_push_tail (queue->async_queue, op);
g_cond_signal (queue->new_async_op);
g_mutex_unlock (queue->async_mutex);
}
/*
* Cothreads API
*/
extern GstFairSchedulerCothreadQueue *
gst_fair_scheduler_cothread_queue_new (void)
{
GstFairSchedulerCothreadQueue *new;
new = g_malloc (sizeof (GstFairSchedulerCothreadQueue));
new->context = NULL;
new->ct_queue = g_queue_new ();
new->async_queue = g_queue_new ();
new->async_mutex = g_mutex_new ();
new->new_async_op = g_cond_new ();
return new;
}
extern void
gst_fair_scheduler_cothread_queue_destroy (GstFairSchedulerCothreadQueue *
queue)
{
GList *iter;
/* Destroy all remaining cothreads. */
for (iter = queue->ct_queue->head; iter != NULL; iter = iter->next) {
gst_fair_scheduler_cothread_destroy (
(GstFairSchedulerCothread *) iter->data);
}
g_queue_free (queue->ct_queue);
for (iter = queue->async_queue->head; iter != NULL; iter = iter->next) {
g_free (iter->data);
}
g_queue_free (queue->async_queue);
g_mutex_free (queue->async_mutex);
g_cond_free (queue->new_async_op);
g_free (queue);
}
extern void
gst_fair_scheduler_cothread_queue_start (GstFairSchedulerCothreadQueue * queue)
{
if (queue->context == NULL) {
do_cothreads_init (NULL);
queue->context = do_cothread_context_init ();
}
}
extern void
gst_fair_scheduler_cothread_queue_stop (GstFairSchedulerCothreadQueue * queue)
{
if (queue->context != NULL) {
do_cothread_context_destroy (queue->context);
}
}
gboolean
gst_fair_scheduler_cothread_queue_iterate (GstFairSchedulerCothreadQueue *
queue)
{
GstFairSchedulerCothread *ct;
g_return_val_if_fail (queue->context != NULL, FALSE);
GST_LOG ("queue %p: iterating", queue);
/* Perform any pending asynchronous operations. Checking the queue
is safe and more efficient without locking the mutex. */
if (!g_queue_is_empty (queue->async_queue)) {
AsyncOp *basic_op;
GST_LOG ("queue %p: processing asynchronous operations", queue);
g_mutex_lock (queue->async_mutex);
while (!g_queue_is_empty (queue->async_queue)) {
basic_op = (AsyncOp *) g_queue_pop_head (queue->async_queue);
switch (basic_op->type) {
case ASYNC_OP_CHANGE_STATE:
{
AsyncOpChangeState *op = (AsyncOpChangeState *) basic_op;
gst_fair_scheduler_cothread_change_state (op->ct, op->new_state);
}
break;
case ASYNC_OP_AWAKE:
{
AsyncOpAwake *op = (AsyncOpAwake *) basic_op;
gst_fair_scheduler_cothread_awake (op->ct, op->priority);
}
break;
default:
g_return_val_if_reached (FALSE);
break;
}
g_free (basic_op);
}
g_mutex_unlock (queue->async_mutex);
}
/* First cothread in the queue (if any) should get control. */
ct = g_queue_peek_head (queue->ct_queue);
if (ct == NULL) {
GTimeVal timeout;
g_get_current_time (&timeout);
g_time_val_add (&timeout, 5000);
/* No cothread available, wait until some other thread queues an
operation. */
g_mutex_lock (queue->async_mutex);
g_cond_timed_wait (queue->new_async_op, queue->async_mutex, &timeout);
g_mutex_unlock (queue->async_mutex);
return FALSE;
}
g_return_val_if_fail (ct->state == GST_FAIRSCHEDULER_CTSTATE_RUNNING, FALSE);
/* Check for a cothread mutex. */
if (ct->mutex != NULL) {
g_mutex_lock (ct->mutex);
ct->mutex = NULL;
}
GST_LOG ("queue %p: giving control to %p", queue, ct);
/* Handle control to the cothread. */
do_cothread_switch (ct->execst);
return TRUE;
}
void
gst_fair_scheduler_cothread_queue_show (GstFairSchedulerCothreadQueue * queue)
{
GList *iter;
GstFairSchedulerCothread *ct;
g_print ("\n Running cothreads (last is active):\n");
for (iter = queue->ct_queue->tail; iter != NULL; iter = iter->prev) {
ct = (GstFairSchedulerCothread *) iter->data;
g_print (" %p: %s (%d)\n", ct, ct->readable_name->str, ct->pid);
}
}
GstFairSchedulerCothread *
gst_fair_scheduler_cothread_new (GstFairSchedulerCothreadQueue * queue,
GstFairSchedulerCtFunc function, gpointer first_arg, ...)
{
GstFairSchedulerCothread *new;
va_list ap;
gpointer arg;
new = g_malloc (sizeof (GstFairSchedulerCothread));
new->queue = queue;
new->func = function;
/* The first parameter is always the cothread structure itself. */
new->argv[0] = (char *) new;
new->argc = 1;
/* Store the parameters. */
va_start (ap, first_arg);
arg = first_arg;
while (new->argc < GST_FAIRSCHEDULER_MAX_CTARGS && arg != NULL) {
new->argv[new->argc] = (char *) arg;
new->argc++;
arg = va_arg (ap, gpointer);
}
/* Make sure we don't have more parameters than we can handle. */
g_return_val_if_fail (arg == NULL, NULL);
/* Creation of the actual execution state is defered to transition
to running/suspended. */
new->execst = NULL;
/* All cothreads are created in the stopped state. */
new->state = GST_FAIRSCHEDULER_CTSTATE_STOPPED;
new->mutex = NULL;
#ifndef GST_DISABLE_GST_DEBUG
new->readable_name = g_string_new ("");
new->pid = 0;
#endif
GST_DEBUG ("queue %p: cothread %p created", queue, new);
return new;
}
void
gst_fair_scheduler_cothread_destroy (GstFairSchedulerCothread * ct)
{
GST_DEBUG ("queue %p: destroying cothread %p", ct->queue, ct);
if (ct->state != GST_FAIRSCHEDULER_CTSTATE_STOPPED) {
cothread_deactivate (ct);
}
if (ct->execst != NULL) {
do_cothread_destroy (ct->execst);
}
#ifndef GST_DISABLE_GST_DEBUG
g_string_free (ct->readable_name, TRUE);
#endif
g_free (ct);
}
void
gst_fair_scheduler_cothread_change_state (GstFairSchedulerCothread * ct,
gint new_state)
{
if (new_state == ct->state) {
return;
}
GST_DEBUG ("queue %p: changing state of %p from %s to %s", ct->queue, ct,
gst_fairscheduler_ct_state_names[ct->state],
gst_fairscheduler_ct_state_names[new_state]);
switch (ct->state) {
case GST_FAIRSCHEDULER_CTSTATE_STOPPED:
/* (Re)Initialize the cothread. */
if (ct->execst == NULL) {
/* Initialize cothread's execution state. */
do_cothread_create (ct->execst, ct->queue->context,
cothread_base_func, ct->argc, ct->argv);
GST_LOG_OBJECT (ct->queue,
"cothread %p has exec state %p", ct, ct->execst);
} else {
/* Reset cothread's execution state. */
do_cothread_setfunc (ct->execst, ct->queue->context,
cothread_base_func, ct->argc, ct->argv);
}
ct->sleeping = FALSE;
if (new_state == GST_FAIRSCHEDULER_CTSTATE_RUNNING) {
cothread_activate (ct, 0);
}
break;
case GST_FAIRSCHEDULER_CTSTATE_RUNNING:
if (!ct->sleeping) {
cothread_deactivate (ct);
}
break;
case GST_FAIRSCHEDULER_CTSTATE_SUSPENDED:
if (new_state == GST_FAIRSCHEDULER_CTSTATE_RUNNING && !ct->sleeping) {
cothread_activate (ct, 0);
}
break;
}
ct->state = new_state;
}
void
gst_fair_scheduler_cothread_change_state_async (GstFairSchedulerCothread * ct,
gint new_state)
{
AsyncOpChangeState *op;
/* Queue an asynchronous operation. */
op = g_new (AsyncOpChangeState, 1);
op->parent.type = ASYNC_OP_CHANGE_STATE;
op->ct = ct;
op->new_state = new_state;
queue_async_op (ct->queue, (AsyncOp *) op);
}
void
gst_fair_scheduler_cothread_sleep (GstFairSchedulerCothreadQueue * queue)
{
gst_fair_scheduler_cothread_sleep_mutex (queue, NULL);
}
/*
* Go to sleep but unblock the mutex while sleeping.
*/
void
gst_fair_scheduler_cothread_sleep_mutex (GstFairSchedulerCothreadQueue * queue,
GMutex * mutex)
{
GstFairSchedulerCothread *ct;
g_return_if_fail (queue->context != NULL);
/* The sleep operation can be invoked when the cothread is already
deactivated. */
ct = gst_fair_scheduler_cothread_current (queue);
if (ct != NULL && ct->execst == do_cothread_get_current (queue->context)) {
ct = g_queue_pop_head (queue->ct_queue);
ct->sleeping = TRUE;
}
ct->mutex = mutex;
if (mutex != NULL) {
g_mutex_unlock (mutex);
}
GST_LOG ("queue %p: cothread going to sleep", queue);
/* Switch back to the main cothread. */
do_cothread_switch (do_cothread_get_main (queue->context));
}
void
gst_fair_scheduler_cothread_yield (GstFairSchedulerCothreadQueue * queue)
{
gst_fair_scheduler_cothread_yield_mutex (queue, NULL);
}
void
gst_fair_scheduler_cothread_yield_mutex (GstFairSchedulerCothreadQueue * queue,
GMutex * mutex)
{
GstFairSchedulerCothread *ct;
g_return_if_fail (queue->context != NULL);
/* The yield operation can be invoked when the cothread is already
deactivated. */
ct = gst_fair_scheduler_cothread_current (queue);
if (ct != NULL && ct->execst == do_cothread_get_current (queue->context)) {
ct = g_queue_pop_head (queue->ct_queue);
g_queue_push_tail (queue->ct_queue, ct);
}
ct->mutex = mutex;
if (mutex != NULL) {
g_mutex_unlock (mutex);
}
GST_LOG ("queue %p: cothread yielding control", queue);
/* Switch back to the main cothread. */
do_cothread_switch (do_cothread_get_main (queue->context));
}
void
gst_fair_scheduler_cothread_awake (GstFairSchedulerCothread * ct, gint priority)
{
g_return_if_fail (ct->state != GST_FAIRSCHEDULER_CTSTATE_STOPPED);
if (!ct->sleeping) {
/* Cothread is already awake. */
return;
}
ct->sleeping = FALSE;
if (ct->state == GST_FAIRSCHEDULER_CTSTATE_RUNNING) {
cothread_activate (ct, priority);
}
}
void
gst_fair_scheduler_cothread_awake_async (GstFairSchedulerCothread * ct,
gint priority)
{
AsyncOpAwake *op;
/* Queue an asynchronous operation. */
op = g_new (AsyncOpAwake, 1);
op->parent.type = ASYNC_OP_AWAKE;
op->ct = ct;
op->priority = priority;
queue_async_op (ct->queue, (AsyncOp *) op);
}
GstFairSchedulerCothread *
gst_fair_scheduler_cothread_current (GstFairSchedulerCothreadQueue * queue)
{
return g_queue_peek_head (queue->ct_queue);
}

View file

@ -1,163 +0,0 @@
/* GStreamer
* Copyright (C) 2004 Martin Soto <martinsoto@users.sourceforge.net>
*
* faircothread.h: High level cothread implementation for the fair scheduler.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __FAIRCOTHREADS_H__
#define __FAIRCOTHREADS_H__
#ifdef _COTHREADS_PTH
#include "pth-cothreads.h"
#else
#define GTHREAD_COTHREADS_NO_DEFINITIONS
#include "cothreads_compat.h"
#endif
typedef struct _GstFairSchedulerCothread GstFairSchedulerCothread;
typedef struct _GstFairSchedulerCothreadQueue GstFairSchedulerCothreadQueue;
/* Possible states of a cothread. */
enum
{
GST_FAIRSCHEDULER_CTSTATE_STOPPED,
GST_FAIRSCHEDULER_CTSTATE_SUSPENDED,
GST_FAIRSCHEDULER_CTSTATE_RUNNING,
};
/* Maximum number of cothread parameters. */
#define GST_FAIRSCHEDULER_MAX_CTARGS 7
/* Cothread function type. */
typedef void (*GstFairSchedulerCtFunc) (GstFairSchedulerCothread * ct,
gpointer first_arg, ...);
struct _GstFairSchedulerCothread {
GstFairSchedulerCothreadQueue *queue;
/* Cothread queue this cothread
belongs to. */
GstFairSchedulerCtFunc func; /* Cothread function. */
char *argv[1 + GST_FAIRSCHEDULER_MAX_CTARGS]; /*
Arguments for the cothread function.
argv[0] is always the cothread
object itself. */
int argc; /* Number of stored parameters. */
cothread *execst; /* Execution state for this cothread. */
gint state; /* Current cothread state. */
gboolean sleeping; /* Is this cothread sleeping? */
GMutex *mutex; /* If not null, a mutex to lock before
giving control to this cothread. */
#ifndef GST_DISABLE_GST_DEBUG
GString *readable_name; /* Readable name for this cothread. */
gint pid; /* Process or thread id associated to
this cothread. */
#endif
};
struct _GstFairSchedulerCothreadQueue {
cothread_context *context; /* Cothread context. */
GQueue *ct_queue; /* Queue of currently running
cothreads. New cothreads are pushed
on the tail. If a cothread is
executing, it is the one in the
head. */
/* Asynchronous support. */
GQueue *async_queue; /* Queue storing asynchronous
operations (operations on the queue
requested potentially from other
threads. */
GMutex *async_mutex; /* Mutex to protect acceses to
async_queue. */
GCond *new_async_op; /* Condition variable to signal the
presence of a new asynchronous
operation in the queue. */
};
extern GstFairSchedulerCothreadQueue *
gst_fair_scheduler_cothread_queue_new (void);
extern void
gst_fair_scheduler_cothread_queue_destroy (
GstFairSchedulerCothreadQueue * queue);
extern void
gst_fair_scheduler_cothread_queue_start (
GstFairSchedulerCothreadQueue * queue);
extern void
gst_fair_scheduler_cothread_queue_stop (
GstFairSchedulerCothreadQueue * queue);
extern gboolean
gst_fair_scheduler_cothread_queue_iterate (
GstFairSchedulerCothreadQueue * queue);
extern void
gst_fair_scheduler_cothread_queue_show (
GstFairSchedulerCothreadQueue * queue);
extern GstFairSchedulerCothread *
gst_fair_scheduler_cothread_new (GstFairSchedulerCothreadQueue * queue,
GstFairSchedulerCtFunc function, gpointer first_arg, ...);
extern void
gst_fair_scheduler_cothread_destroy (GstFairSchedulerCothread * ct);
extern void
gst_fair_scheduler_cothread_change_state (GstFairSchedulerCothread * ct,
gint new_state);
extern void
gst_fair_scheduler_cothread_change_state_async (
GstFairSchedulerCothread * ct, gint new_state);
extern void
gst_fair_scheduler_cothread_sleep (GstFairSchedulerCothreadQueue * queue);
extern void
gst_fair_scheduler_cothread_sleep_mutex (
GstFairSchedulerCothreadQueue * queue, GMutex * mutex);
extern void
gst_fair_scheduler_cothread_yield (GstFairSchedulerCothreadQueue * queue);
extern void
gst_fair_scheduler_cothread_yield_mutex (
GstFairSchedulerCothreadQueue * queue, GMutex * mutex);
extern void
gst_fair_scheduler_cothread_awake (GstFairSchedulerCothread * ct,
gint priority);
extern void
gst_fair_scheduler_cothread_awake_async (GstFairSchedulerCothread * ct,
gint priority);
extern GstFairSchedulerCothread *
gst_fair_scheduler_cothread_current (GstFairSchedulerCothreadQueue * queue);
#endif /* __FAIRCOTHREADS_H__ */

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,221 +0,0 @@
/* GStreamer
* Copyright (C) 2003 Benjamin Otte <in7y118@public.uni-hamburg.de>
*
* gthread-cothreads.c: cothreads implemented via GThread for compatibility
* They're probably slooooooow
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GTHREAD_COTHREADS_H__
#define __GTHREAD_COTHREADS_H__
#include <glib.h>
#include <gst/gstthread.h>
/* the name of this cothreads */
#define COTHREADS_TYPE gthread
#define COTHREADS_NAME "gthread"
#define COTHREADS_NAME_CAPITAL "GThread"
/*
* Theory of operation:
* Instead of using cothreads, GThreads and 1 mutex are used.
* Every thread may only run if it holds the mutex. Otherwise it holds its own
* cond which has to be signaled to wakeit up.
*/
/* define "cothread", "cothread_context" and "cothread_func" */
typedef int (*cothread_func) (int, char **);
typedef struct _cothread cothread;
typedef struct _cothread_context cothread_context;
struct _cothread_context {
GSList * cothreads; /* contains all threads but main */
cothread * main;
cothread * current;
GMutex * mutex;
GstThread * gst_thread; /* the GstThread we're running from */
};
struct _cothread {
GThread * thread;
GCond * cond;
cothread_func run;
int argc;
char ** argv;
cothread * creator;
gboolean die;
cothread_context * context;
};
#ifndef GTHREAD_COTHREADS_NO_DEFINITIONS
/* define functions
* Functions starting with "do_" are used by the scheduler.
*/
static void do_cothreads_init (void *unused);
static cothread_context *do_cothread_context_init (void);
static void do_cothread_context_destroy (cothread_context *context);
static cothread * cothread_create (cothread_context *context,
cothread_func func,
int argc,
char **argv);
#define do_cothread_create(new_cothread, context, func, argc, argv) \
G_STMT_START{ \
new_cothread = cothread_create ((context), (func), argc, (char**) (argv)); \
}G_STMT_END
static void do_cothread_switch (cothread *to);
static void do_cothread_destroy (cothread *thread);
#define do_cothread_get_current(context) ((context)->current)
#define do_cothread_get_main(context) ((context)->main)
static void
do_cothreads_init (void *unused)
{
if (!g_thread_supported ()) g_thread_init (NULL);
}
static cothread_context *
do_cothread_context_init (void)
{
cothread_context *ret = g_new0 (cothread_context, 1);
ret->main = g_new0 (cothread, 1);
ret->main->thread = g_thread_self ();
ret->main->cond = g_cond_new ();
ret->main->die = FALSE;
ret->main->context = ret;
ret->mutex = g_mutex_new ();
ret->cothreads = NULL;
ret->current = ret->main;
ret->gst_thread = gst_thread_get_current();
g_mutex_lock (ret->mutex);
return ret;
}
static void
do_cothread_context_destroy (cothread_context *context)
{
g_assert (g_thread_self() == context->main->thread);
while (context->cothreads) {
do_cothread_destroy ((cothread *) context->cothreads->data);
}
g_mutex_unlock (context->mutex);
g_mutex_free (context->mutex);
g_cond_free (context->main->cond);
g_free (context->main);
g_free (context);
}
static void
die (cothread *to_die) {
g_cond_free (to_die->cond);
to_die->context->cothreads = g_slist_remove (to_die->context->cothreads, to_die);
g_free (to_die);
g_thread_exit (to_die);
/* don't unlock the mutex here, the thread waiting for us to die is gonna take it */
}
static gpointer
run_new_thread (gpointer data)
{
cothread *self = (cothread *) data;
g_mutex_lock (self->context->mutex);
g_private_set (gst_thread_current, self->context->gst_thread);
g_cond_signal (self->creator->cond);
g_cond_wait (self->cond, self->context->mutex);
if (self->die)
die (self);
while (TRUE) {
self->run (self->argc, self->argv);
/* compatibility */
do_cothread_switch (do_cothread_get_main (self->context));
}
g_assert_not_reached ();
return NULL;
}
static cothread *
cothread_create (cothread_context *context, cothread_func func, int argc, char **argv)
{
cothread *ret;
if ((ret = g_new (cothread, 1)) == NULL) {
goto out1;
}
ret->cond = g_cond_new ();
ret->run = func;
ret->argc = argc;
ret->argv = argv;
ret->creator = do_cothread_get_current (context);
ret->die = FALSE;
ret->context = context;
context->cothreads = g_slist_prepend (context->cothreads, ret);
ret->thread = g_thread_create (run_new_thread, ret, TRUE, NULL);
if (ret->thread == NULL) goto out2;
g_cond_wait (do_cothread_get_current (context)->cond, context->mutex);
return ret;
out2:
context->cothreads = g_slist_remove (context->cothreads, ret);
g_free (ret);
out1:
return NULL;
}
static void do_cothread_switch (cothread *to)
{
cothread *self = do_cothread_get_current(to->context);
if (self != to) {
self->context->current = to;
g_cond_signal (to->cond);
g_cond_wait (self->cond, self->context->mutex);
if (self->die)
die (self);
}
}
#define do_cothread_setfunc(thread,context,_func,_argc,_argv) G_STMT_START {\
((cothread *)(thread))->run = (_func); \
((cothread *)(thread))->argc = _argc; \
((cothread *)(thread))->argv = _argv; \
}G_STMT_END
static void
do_cothread_destroy (cothread *thread)
{
GThread *join;
cothread_context *context;
g_return_if_fail (thread != thread->context->main);
g_return_if_fail (thread != thread->context->current);
thread->die = TRUE;
join = thread->thread;
context = thread->context;
g_cond_signal (thread->cond);
g_mutex_unlock (thread->context->mutex);
g_thread_join (join);
/* the mutex was locked by the thread that we joined, no need to lock again */
}
#define do_cothread_get_current(context) ((context)->current)
#define do_cothread_get_main(context) ((context)->main)
#endif /* GTHREAD_COTHREADS_NO_DEFINITIONS */
#endif /* __GTHREAD_COTHREADS_H__ */

View file

@ -245,13 +245,8 @@ gst_thread_scheduler_get_type (void)
static void gst_thread_scheduler_setup (GstScheduler * sched);
static void gst_thread_scheduler_reset (GstScheduler * sched);
static void gst_thread_scheduler_add_element (GstScheduler * sched,
GstElement * element);
static void gst_thread_scheduler_remove_element (GstScheduler * sched,
GstElement * element);
static GstTask *gst_thread_scheduler_create_task (GstScheduler * sched,
GstTaskFunction func, gpointer data);
static void gst_thread_scheduler_show (GstScheduler * scheduler);
static void
gst_thread_scheduler_class_init (gpointer klass, gpointer class_data)
@ -260,11 +255,7 @@ gst_thread_scheduler_class_init (gpointer klass, gpointer class_data)
scheduler->setup = gst_thread_scheduler_setup;
scheduler->reset = gst_thread_scheduler_reset;
scheduler->add_element = gst_thread_scheduler_add_element;
scheduler->remove_element = gst_thread_scheduler_remove_element;
scheduler->create_task = gst_thread_scheduler_create_task;
scheduler->clock_wait = NULL;
scheduler->show = gst_thread_scheduler_show;
}
static void
@ -335,31 +326,6 @@ gst_thread_scheduler_reset (GstScheduler * sched)
{
}
static void
gst_thread_scheduler_add_element (GstScheduler * scheduler,
GstElement * element)
{
g_print ("add element\n");
}
static void
gst_thread_scheduler_remove_element (GstScheduler * scheduler,
GstElement * element)
{
GstThreadSchedulerTask *task;;
task = ELEMENT_PRIVATE (element);
if (task) {
g_object_unref (G_OBJECT (task));
ELEMENT_PRIVATE (element) = NULL;;
}
}
static void
gst_thread_scheduler_show (GstScheduler * scheduler)
{
}
static gboolean
plugin_init (GstPlugin * plugin)
{

View file

@ -861,48 +861,6 @@ done:
GST_STREAM_UNLOCK (pad);
}
#if 0
/**
* gst_fakesrc_loop:
* @element: the faksesrc to loop
*
* generate an empty buffer and push it to the next element.
*/
static gboolean
gst_fakesrc_loop (GstPad * pad)
{
GstFakeSrc *src;
const GList *pads;
GstTask *task;
src = GST_FAKESRC (GST_PAD_PARENT (pad));
task = src->task;
pads = GST_ELEMENT (src)->pads;
while (pads) {
GstPad *pad = GST_PAD (pads->data);
GstBuffer *buffer;
GstFlowReturn ret;
ret = gst_fakesrc_get (pad, &buffer);
if (ret != GST_FLOW_OK) {
return FALSE;
}
ret = gst_pad_push (pad, buffer);
if (ret != GST_FLOW_OK) {
return FALSE;
}
if (src->eos) {
return FALSE;
}
pads = g_list_next (pads);
}
return TRUE;
}
#endif
static gboolean
gst_fakesrc_activate (GstPad * pad, GstActivateMode mode)
{

View file

@ -259,6 +259,7 @@ gst_identity_chain (GstPad * pad, GstBuffer * buffer)
{
GstBuffer *buf = GST_BUFFER (buffer);
GstIdentity *identity;
GstFlowReturn result = GST_FLOW_OK;
guint i;
g_return_val_if_fail (pad != NULL, GST_FLOW_ERROR);
@ -365,12 +366,12 @@ gst_identity_chain (GstPad * pad, GstBuffer * buffer)
}
identity->bytes_handled += GST_BUFFER_SIZE (buf);
gst_pad_push (identity->srcpad, buf);
result = gst_pad_push (identity->srcpad, buf);
if (identity->sleep_time)
g_usleep (identity->sleep_time);
}
return GST_FLOW_OK;
return result;
}
#if 0

View file

@ -19,14 +19,15 @@
#include <gst/gst.h>
/* tests if gst_bin_get_(all_)by_interface works */
/* tests if gst_bin_iterate_(all_)by_interface works */
gint
main (gint argc, gchar * argv[])
{
GstBin *bin, *bin2;
GList *list;
GstIterator *it;
GstElement *filesrc;
gpointer item;
gst_init (&argc, &argv);
@ -39,20 +40,22 @@ main (gint argc, gchar * argv[])
gst_bin_add (bin, filesrc);
g_assert (gst_bin_get_by_interface (bin, GST_TYPE_URI_HANDLER) == filesrc);
list = gst_bin_get_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (g_list_length (list) == 1);
g_assert (list->data == (gpointer) filesrc);
g_list_free (list);
it = gst_bin_iterate_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_OK);
g_assert (item == (gpointer) filesrc);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_DONE);
gst_iterator_free (it);
gst_bin_add_many (bin,
gst_element_factory_make ("identity", NULL),
gst_element_factory_make ("identity", NULL),
gst_element_factory_make ("identity", NULL), NULL);
g_assert (gst_bin_get_by_interface (bin, GST_TYPE_URI_HANDLER) == filesrc);
list = gst_bin_get_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (g_list_length (list) == 1);
g_assert (list->data == (gpointer) filesrc);
g_list_free (list);
it = gst_bin_iterate_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_OK);
g_assert (item == (gpointer) filesrc);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_DONE);
gst_iterator_free (it);
bin2 = bin;
bin = GST_BIN (gst_bin_new (NULL));
@ -62,16 +65,20 @@ main (gint argc, gchar * argv[])
gst_element_factory_make ("identity", NULL),
GST_ELEMENT (bin2), gst_element_factory_make ("identity", NULL), NULL);
g_assert (gst_bin_get_by_interface (bin, GST_TYPE_URI_HANDLER) == filesrc);
list = gst_bin_get_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (g_list_length (list) == 1);
g_assert (list->data == (gpointer) filesrc);
g_list_free (list);
it = gst_bin_iterate_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_OK);
g_assert (item == (gpointer) filesrc);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_DONE);
gst_iterator_free (it);
gst_bin_add (bin, gst_element_factory_make ("filesrc", NULL));
gst_bin_add (bin2, gst_element_factory_make ("filesrc", NULL));
list = gst_bin_get_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (g_list_length (list) == 3);
g_list_free (list);
it = gst_bin_iterate_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_OK);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_OK);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_OK);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_DONE);
gst_iterator_free (it);
g_object_unref (bin);
return 0;

View file

@ -7,22 +7,20 @@ do_test (void)
{
GstElement *pipeline;
int i;
gboolean ret;
gst_init (NULL, NULL);
pipeline = gst_parse_launch ("fakesrc ! fakesink", NULL);
g_assert (pipeline != NULL);
gst_element_set_state (pipeline, GST_STATE_PLAYING);
for (i = 0; i < 100; i++) {
ret = gst_bin_iterate (GST_BIN (pipeline));
g_assert (ret);
g_usleep (1000);
g_print ("%s", (i & 1) ? "+" : "-");
}
g_print ("\n");
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (pipeline));
}

View file

@ -13,3 +13,4 @@ test4
bin
locked
parent
stress

View file

@ -1,5 +1,5 @@
include ../Rules
tests_pass = test1 test2 test3 test4 locked parent
tests_pass = test1 test2 test3 test4 locked parent stress
tests_fail =
tests_ignore =

View file

@ -0,0 +1,97 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include "unistd.h"
#include <gst/gst.h>
static GRand *myrand;
static GMainLoop *loop;
static gboolean
message_received (GstBus * bus, GstMessage * message, GstPipeline * pipeline)
{
g_print ("message %p\n", message);
if (message->type == GST_MESSAGE_EOS) {
g_print ("EOS!!\n");
if (g_main_loop_is_running (loop))
g_main_loop_quit (loop);
}
gst_message_unref (message);
return TRUE;
}
static gboolean
state_change (GstElement * element)
{
g_usleep (g_rand_int_range (myrand, 100, 600));
g_print ("pause..\n");
gst_element_set_state (element, GST_STATE_PAUSED);
gst_element_get_state (element, NULL, NULL, NULL);
g_print ("done\n");
g_usleep (g_rand_int_range (myrand, 50, 100));
g_print ("play..\n");
gst_element_set_state (element, GST_STATE_PLAYING);
gst_element_get_state (element, NULL, NULL, NULL);
g_print ("done\n");
return TRUE;
}
gint
main (gint argc, gchar * argv[])
{
GstElement *pipeline;
GstElement *fakesrc1, *fakesink1;
GstBus *bus;
gst_init (&argc, &argv);
myrand = g_rand_new ();
pipeline = gst_pipeline_new ("pipeline");
loop = g_main_loop_new (NULL, FALSE);
bus = gst_element_get_bus (pipeline);
gst_bus_add_watch (bus, (GstBusHandler) message_received, pipeline);
gst_object_unref (GST_OBJECT (bus));
fakesrc1 = gst_element_factory_make ("fakesrc", "fakesrc1");
g_object_set (G_OBJECT (fakesrc1), "num_buffers", 1000, NULL);
fakesink1 = gst_element_factory_make ("fakesink", "fakesink1");
gst_bin_add (GST_BIN (pipeline), fakesrc1);
gst_bin_add (GST_BIN (pipeline), fakesink1);
gst_pad_link (gst_element_get_pad (fakesrc1, "src"),
gst_element_get_pad (fakesink1, "sink"));
g_signal_connect (G_OBJECT (pipeline), "deep_notify",
G_CALLBACK (gst_object_default_deep_notify), NULL);
g_idle_add ((GSourceFunc) state_change, pipeline);
g_main_loop_run (loop);
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (pipeline));
return 0;
}

View file

@ -19,14 +19,15 @@
#include <gst/gst.h>
/* tests if gst_bin_get_(all_)by_interface works */
/* tests if gst_bin_iterate_(all_)by_interface works */
gint
main (gint argc, gchar * argv[])
{
GstBin *bin, *bin2;
GList *list;
GstIterator *it;
GstElement *filesrc;
gpointer item;
gst_init (&argc, &argv);
@ -39,20 +40,22 @@ main (gint argc, gchar * argv[])
gst_bin_add (bin, filesrc);
g_assert (gst_bin_get_by_interface (bin, GST_TYPE_URI_HANDLER) == filesrc);
list = gst_bin_get_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (g_list_length (list) == 1);
g_assert (list->data == (gpointer) filesrc);
g_list_free (list);
it = gst_bin_iterate_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_OK);
g_assert (item == (gpointer) filesrc);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_DONE);
gst_iterator_free (it);
gst_bin_add_many (bin,
gst_element_factory_make ("identity", NULL),
gst_element_factory_make ("identity", NULL),
gst_element_factory_make ("identity", NULL), NULL);
g_assert (gst_bin_get_by_interface (bin, GST_TYPE_URI_HANDLER) == filesrc);
list = gst_bin_get_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (g_list_length (list) == 1);
g_assert (list->data == (gpointer) filesrc);
g_list_free (list);
it = gst_bin_iterate_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_OK);
g_assert (item == (gpointer) filesrc);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_DONE);
gst_iterator_free (it);
bin2 = bin;
bin = GST_BIN (gst_bin_new (NULL));
@ -62,16 +65,20 @@ main (gint argc, gchar * argv[])
gst_element_factory_make ("identity", NULL),
GST_ELEMENT (bin2), gst_element_factory_make ("identity", NULL), NULL);
g_assert (gst_bin_get_by_interface (bin, GST_TYPE_URI_HANDLER) == filesrc);
list = gst_bin_get_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (g_list_length (list) == 1);
g_assert (list->data == (gpointer) filesrc);
g_list_free (list);
it = gst_bin_iterate_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_OK);
g_assert (item == (gpointer) filesrc);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_DONE);
gst_iterator_free (it);
gst_bin_add (bin, gst_element_factory_make ("filesrc", NULL));
gst_bin_add (bin2, gst_element_factory_make ("filesrc", NULL));
list = gst_bin_get_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (g_list_length (list) == 3);
g_list_free (list);
it = gst_bin_iterate_all_by_interface (bin, GST_TYPE_URI_HANDLER);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_OK);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_OK);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_OK);
g_assert (gst_iterator_next (it, &item) == GST_ITERATOR_DONE);
gst_iterator_free (it);
g_object_unref (bin);
return 0;

View file

@ -7,22 +7,20 @@ do_test (void)
{
GstElement *pipeline;
int i;
gboolean ret;
gst_init (NULL, NULL);
pipeline = gst_parse_launch ("fakesrc ! fakesink", NULL);
g_assert (pipeline != NULL);
gst_element_set_state (pipeline, GST_STATE_PLAYING);
for (i = 0; i < 100; i++) {
ret = gst_bin_iterate (GST_BIN (pipeline));
g_assert (ret);
g_usleep (1000);
g_print ("%s", (i & 1) ? "+" : "-");
}
g_print ("\n");
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (pipeline));
}

View file

@ -13,3 +13,4 @@ test4
bin
locked
parent
stress

View file

@ -1,5 +1,5 @@
include ../Rules
tests_pass = test1 test2 test3 test4 locked parent
tests_pass = test1 test2 test3 test4 locked parent stress
tests_fail =
tests_ignore =

97
testsuite/states/stress.c Normal file
View file

@ -0,0 +1,97 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include "unistd.h"
#include <gst/gst.h>
static GRand *myrand;
static GMainLoop *loop;
static gboolean
message_received (GstBus * bus, GstMessage * message, GstPipeline * pipeline)
{
g_print ("message %p\n", message);
if (message->type == GST_MESSAGE_EOS) {
g_print ("EOS!!\n");
if (g_main_loop_is_running (loop))
g_main_loop_quit (loop);
}
gst_message_unref (message);
return TRUE;
}
static gboolean
state_change (GstElement * element)
{
g_usleep (g_rand_int_range (myrand, 100, 600));
g_print ("pause..\n");
gst_element_set_state (element, GST_STATE_PAUSED);
gst_element_get_state (element, NULL, NULL, NULL);
g_print ("done\n");
g_usleep (g_rand_int_range (myrand, 50, 100));
g_print ("play..\n");
gst_element_set_state (element, GST_STATE_PLAYING);
gst_element_get_state (element, NULL, NULL, NULL);
g_print ("done\n");
return TRUE;
}
gint
main (gint argc, gchar * argv[])
{
GstElement *pipeline;
GstElement *fakesrc1, *fakesink1;
GstBus *bus;
gst_init (&argc, &argv);
myrand = g_rand_new ();
pipeline = gst_pipeline_new ("pipeline");
loop = g_main_loop_new (NULL, FALSE);
bus = gst_element_get_bus (pipeline);
gst_bus_add_watch (bus, (GstBusHandler) message_received, pipeline);
gst_object_unref (GST_OBJECT (bus));
fakesrc1 = gst_element_factory_make ("fakesrc", "fakesrc1");
g_object_set (G_OBJECT (fakesrc1), "num_buffers", 1000, NULL);
fakesink1 = gst_element_factory_make ("fakesink", "fakesink1");
gst_bin_add (GST_BIN (pipeline), fakesrc1);
gst_bin_add (GST_BIN (pipeline), fakesink1);
gst_pad_link (gst_element_get_pad (fakesrc1, "src"),
gst_element_get_pad (fakesink1, "sink"));
g_signal_connect (G_OBJECT (pipeline), "deep_notify",
G_CALLBACK (gst_object_default_deep_notify), NULL);
g_idle_add ((GSourceFunc) state_change, pipeline);
g_main_loop_run (loop);
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (pipeline));
return 0;
}