Producer–Consumer¶
pc – generic producer–consumer¶
pc module defines necessary classes for producer-consumer
multi-threaded problem.
Usual use case is to create some kind of (thread-safe) generator (usually
shared by py:class:Producer instances). Obtain single value in
Producer.produce() (obtaining value must be thread-safe as several
producers can obtain value at once). Process the value and return the
result.
Once returned, it can be consumed by Consumer instances – and
results can be stored in some kind of (thread-safe) result (multiple
consumers at once can store the value at once).
ProducerConsumer engine manages necessary synchronization among
ProducerThread and ConsumerThread threads and
provides produced objects from Producer.produce() to
Consumer.consume().
Caller should ususually:
- override
Producer.produce() - override
Consumer.consume() - define some kind of thread-safe generator if required for
Producerinstances to obtain initial objects - define some kind of thread-safe result if required for
Consumerinstances to store results
You can look in summer.tests.pctest.ProducerConsumerTest for
inspiration.
Please look at summer.pcg module as a specific implementation of
producer–consumer that uses iterable as input, several producers are used
to iterate over it and produce values consumed by several consumers.
Thread count is determind based on target HW (number of cores available + 1). You can try to experiment with this value. For I/O heavy tasks you can get usually better perfomance by increasing number of threads (as a rule of thumb try twice the number of cores).
-
class
summer.pc.Producer[source]¶ Bases:
objectProducer object produces whatever is needed. Override
produce()method.-
__weakref__¶ list of weak references to the object (if defined)
-
-
class
summer.pc.Consumer[source]¶ Bases:
objectConsumer object consumes whatever is produces by
Producer. Overrideconsume()method.-
__weakref__¶ list of weak references to the object (if defined)
-
-
class
summer.pc.ProducerConsumer(producer: summer.pc.Producer, consumer: summer.pc.Consumer, producer_thread_count=5, consumer_thread_count=5)[source]¶ Bases:
objectProducerConsumerhandles necessary synchronization among producer and consumer threads.-
__init__(producer: summer.pc.Producer, consumer: summer.pc.Consumer, producer_thread_count=5, consumer_thread_count=5)[source]¶ Creates
ProducerConsumerinstance.Parameters:
-
run()[source]¶ Start the producer–consumer engine. Starts the threads and waits for all of them to complete.
-
object_produced(produced_object: object)[source]¶ Called by
ProducerThreadonce the object is produced.
-
object_consumed() → object[source]¶ Called by :py:class`ConsumerThread` when the object is being consumed.
-
__weakref__¶ list of weak references to the object (if defined)
-
pcg – producer–consumer with generator¶
pcg module is more specific producer–consumer implementation based
on common use case: If you need to iterate in parallel over a collection of
input values and invoke an operation for each item.
Typicall usage:
class MyConsumer(Consumer):
def __init__(self, progress: Progress):
self.progress = progress
def consume(self, produced_object):
# do whatever is required to do
self...
# indicate progress -- for example to some gui listener (progressbar, ...)
self.progress.next_step()
if __name__ == "__main__":
iterable = list(...)
consumer = EditionConsumer(self, progress)
pcg = ProducerConsumerWithGenerator(iterable, ProducerWithGenerator(), consumer)
pcg.run()
Producer is replaced with ProducerWithGenerator which may be
left as is usually – it automatically iterates over provided iterable
returning one value at a time. You can override
ProducerWithGenerator.produce_from_slice() method which takes
single argument – the current iterator value.
You can also leverage summer.utils.chunks() function to split
large collection into smaller ones and produce chunks of data to decrease
race conditions in iteration over single iterator –
summer.pc.Consumer class consumes the whole chunks, not single
items, which may improve perfomance.
-
class
summer.pcg.ThreadSafeIterator(iterable: collections.abc.Iterable)[source]¶ Bases:
collections.abc.IteratorImplements thread safe iteration over an iterable.
-
__weakref__¶ list of weak references to the object (if defined)
-
-
class
summer.pcg.ProducerWithGenerator[source]¶ Bases:
summer.pc.ProducerSpecific version of
summer.pc.ProducerforProducerConsumerWithGeneratorengine.-
produce_from_slice(generator_slice: object) → object[source]¶ Take a slice and produce whatever needs to be produced. Default implementation just returns the slice (which is handy in case we just want to iterate over provided iterable).
Parameters: generator_slice – single item from iteration, whatever it may be Returns: - whatever is desired, default implementation just
- returns the passed item, which is reasonable if you want just to iterate in parallel over iterable.
Return type: object
-
-
class
summer.pcg.ProducerConsumerWithGenerator(iterable: collections.abc.Iterable, producer: summer.pcg.ProducerWithGenerator, consumer: summer.pc.Consumer, producer_thread_count=5, consumer_thread_count=5)[source]¶ Bases:
summer.pc.ProducerConsumerSpecific implementation of
summer.pc.ProducerConsumerthat adds thread-safe iteration over provided iterable object passing single values toProducerWithGeneratorinstances one at a time.-
__init__(iterable: collections.abc.Iterable, producer: summer.pcg.ProducerWithGenerator, consumer: summer.pc.Consumer, producer_thread_count=5, consumer_thread_count=5)[source]¶ Creates
ProducerConsumerWithGeneratorinstance.Parameters:
-
-
class
summer.pcg.ProducerThreadWithGenerator(producer_consumer: summer.pcg.ProducerConsumerWithGenerator, producer: summer.pcg.ProducerWithGenerator, generator: summer.pcg.ThreadSafeIterator)[source]¶ Bases:
summer.pc.ProducerThreadThread executing producer instances (ie.
ProducerWithGenerator).