Producer–Consumer¶
pc – generic producer–consumer¶
pc module defines necessary classes for producer-consumer
multi-threaded problem.
Usual use case is to create some kind of (thread-safe) generator (usually
shared by py:class:Producer instances). Obtain single value in
Producer.produce() (obtaining value must be thread-safe as several
producers can obtain value at once). Process the value and return the
result.
Once returned, it can be consumed by Consumer instances – and
results can be stored in some kind of (thread-safe) result (multiple
consumers at once can store the value at once).
ProducerConsumer engine manages necessary synchronization among
ProducerThread and ConsumerThread threads and
provides produced objects from Producer.produce() to
Consumer.consume().
Caller should ususually:
override
Producer.produce()override
Consumer.consume()define some kind of thread-safe generator if required for
Producerinstances to obtain initial objectsdefine some kind of thread-safe result if required for
Consumerinstances to store results
You can look in summer.tests.pctest.ProducerConsumerTest for
inspiration.
Please look at summer.pcg module as a specific implementation of
producer–consumer that uses iterable as input, several producers are used
to iterate over it and produce values consumed by several consumers.
Thread count is determind based on target HW (number of cores available + 1). You can try to experiment with this value. For I/O heavy tasks you can get usually better perfomance by increasing number of threads (as a rule of thumb try twice the number of cores).
-
class
summer.pc.Producer[source]¶ Bases:
objectProducer object produces whatever is needed. Override
produce()method.-
__weakref__¶ list of weak references to the object (if defined)
-
-
class
summer.pc.Consumer[source]¶ Bases:
objectConsumer object consumes whatever is produces by
Producer. Overrideconsume()method.-
__weakref__¶ list of weak references to the object (if defined)
-
-
class
summer.pc.ProducerConsumer(producer: summer.pc.Producer, consumer: summer.pc.Consumer, producer_thread_count=9, consumer_thread_count=9)[source]¶ Bases:
objectProducerConsumerhandles necessary synchronization among producer and consumer threads.Serves as a mediator among producers and consumers.
-
__init__(producer: summer.pc.Producer, consumer: summer.pc.Consumer, producer_thread_count=9, consumer_thread_count=9)[source]¶ Creates
ProducerConsumerinstance.
-
run()[source]¶ Start the producer–consumer engine. Starts the threads and waits for all of them to complete.
-
object_produced(produced_object: object)[source]¶ Called by
ProducerThreadonce the object is produced.
-
object_consumed() → object[source]¶ Called by :py:class`ConsumerThread` when the object is being consumed.
-
__weakref__¶ list of weak references to the object (if defined)
-
-
class
summer.pc.ProducerThread(producer_consumer: summer.pc.ProducerConsumer, producer: summer.pc.Producer)[source]¶ Bases:
threading.ThreadThread executing producer objects.
-
__init__(producer_consumer: summer.pc.ProducerConsumer, producer: summer.pc.Producer)[source]¶ This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
-
run()[source]¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
-
class
summer.pc.ConsumerThread(producer_consumer: summer.pc.ProducerConsumer, consumer: summer.pc.Consumer)[source]¶ Bases:
threading.ThreadThread executing consumer objects.
-
__init__(producer_consumer: summer.pc.ProducerConsumer, consumer: summer.pc.Consumer)[source]¶ This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
-
run()[source]¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
pcg – producer–consumer with generator¶
pcg module is more specific producer–consumer implementation based
on common use case: If you need to iterate in parallel over a collection of
input values and invoke an operation for each item.
Typical usage:
class MyConsumer(Consumer):
def __init__(self, progress: Progress):
self.progress = progress
def consume(self, produced_object):
# do whatever is required to do
self...
# indicate progress -- for example to some gui listener (progressbar, ...)
self.progress.next_step()
if __name__ == "__main__":
iterable = list(...)
consumer = MyConsumer(self, progress)
pcg = ProducerConsumerWithGenerator(iterable, ProducerWithGenerator(), consumer)
pcg.run()
Producer is replaced with ProducerWithGenerator which may be
left as is usually – it automatically iterates over provided iterable
returning one value at a time. You can override
ProducerWithGenerator.produce_from_slice() method which takes
single argument – the current iterator value.
You can also leverage summer.utils.chunks() function to split
large collection into smaller ones and produce chunks of data to decrease
race conditions in iteration over single iterator –
summer.pc.Consumer class consumes the whole chunks, not single
items, which may improve perfomance.
-
class
summer.pcg.ThreadSafeIterator(iterable: collections.abc.Iterable)[source]¶ Bases:
collections.abc.IteratorImplements thread safe iteration over an iterable.
-
__init__(iterable: collections.abc.Iterable)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
__weakref__¶ list of weak references to the object (if defined)
-
-
class
summer.pcg.ProducerWithGenerator[source]¶ Bases:
summer.pc.ProducerSpecific version of
summer.pc.ProducerforProducerConsumerWithGeneratorengine.-
produce_from_slice(generator_slice: object) → object[source]¶ Take a slice and produce whatever needs to be produced. Default implementation just returns the slice (which is handy in case we just want to iterate over provided iterable).
- Parameters
generator_slice – single item from iteration, whatever it may be
- Returns
- whatever is desired, default implementation just
returns the passed item, which is reasonable if you want just to iterate in parallel over iterable.
- Return type
object
-
-
class
summer.pcg.ProducerConsumerWithGenerator(iterable: collections.abc.Iterable, producer: summer.pcg.ProducerWithGenerator, consumer: summer.pc.Consumer, producer_thread_count=9, consumer_thread_count=9)[source]¶ Bases:
summer.pc.ProducerConsumerSpecific implementation of
summer.pc.ProducerConsumerthat adds thread-safe iteration over provided iterable object passing single values toProducerWithGeneratorinstances one at a time.-
__init__(iterable: collections.abc.Iterable, producer: summer.pcg.ProducerWithGenerator, consumer: summer.pc.Consumer, producer_thread_count=9, consumer_thread_count=9)[source]¶ Creates
ProducerConsumerWithGeneratorinstance.- Parameters
-
-
class
summer.pcg.ProducerThreadWithGenerator(producer_consumer: summer.pcg.ProducerConsumerWithGenerator, producer: summer.pcg.ProducerWithGenerator, generator: summer.pcg.ThreadSafeIterator)[source]¶ Bases:
summer.pc.ProducerThreadThread executing producer instances (ie.
ProducerWithGenerator).-
__init__(producer_consumer: summer.pcg.ProducerConsumerWithGenerator, producer: summer.pcg.ProducerWithGenerator, generator: summer.pcg.ThreadSafeIterator)[source]¶ This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
-
run()[source]¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-