I'm writing a unit test for MyProducer class which produces the message on kafka topic. logic of testing is just mock the constructor of MyProducer class and set the manually created producer object to the instance of MyProducer class and then just verify if library methods of confluent-kafka are getting called correctly or not. I'm stuck in setting the manually created producer object to MyProducer. How can we set the manually created object to instance variable of MyClass by mocking init?
# kafka_producer.py # actual code
class MyProducer(object):
def __init__(self, config: Dict[str, Any]={}):
self.producer = Producer(config)
def produce(self, *args, **kwargs):
pass
# test_kafka.py # testing
def get_producer(*args, **kwargs):
print(args,kwargs)
conf = {
"bootstrap.servers": 'localhost:9093'
}
producer = Producer(conf)
def produce(*args, **kwargs):
pass
class KafkaTest(unittest.TestCase):
def setUp(self):
pass
@mock.patch(
"utils.kafka_producer.MyProducer.__init__",
)
@mock.patch(
"confluent_kafka.Producer.produce",
)
def test_kafka_producer(self, produce, kafka_producer):
produce.side_effect = produce
kafka_producer.side_effect = get_producer
kafkaProducer = MyProducer(
{'bootstrap.servers': os.environ['KAFKA_BOOTSTRAP_SERVERS']}
)
kafkaProducer.produce(fake_kafka_topic, value='test_data')
It is giving error: AttributeError on 'MyProducer' has no attribute 'producer'.
MyProducerbut you are mockingKafkaProducercome. Overall, No one can give answer assuming your problem. Describe the question like you are describing to a novice person.