1919from __future__ import annotations
2020
2121from copy import deepcopy
22+ from typing import Any
2223
2324from confluent_kafka import KafkaException # type: ignore[import-untyped]
2425
3233
3334
3435class SingletonMeta (type ):
35- _instances : dict [type [ SingletonMeta ], SingletonMeta ] = {}
36+ _instances : dict [SingletonMeta , Any ] = {}
3637
37- def __call__ (cls , * args , ** kwargs ) :
38+ def __call__ (cls , * args : Any , ** kwargs : Any ) -> Any :
3839 if cls not in cls ._instances or "clean" in kwargs .keys ():
3940 instance = super ().__call__ (* args , ** kwargs )
4041 cls ._instances [cls ] = instance
@@ -49,7 +50,7 @@ class KafkaStore(metaclass=SingletonMeta):
4950 FIRST_OFFSET = "first_offset"
5051 NEXT_OFFSET = "next_offset"
5152
52- def __init__ (self , clean : bool = False ):
53+ def __init__ (self , clean : bool = False ) -> None :
5354 if clean :
5455 mock_topics .clear ()
5556 offset_store .clear ()
@@ -70,13 +71,13 @@ def get_number_of_partition(topic: str) -> int:
7071 return len (mock_topics [topic ].keys ())
7172
7273 @staticmethod
73- def create_topic (topic : str ):
74+ def create_topic (topic : str ) -> None :
7475 if mock_topics .get (topic , None ) is not None :
7576 raise KafkaException (f"{ topic } exist is fake kafka" )
7677
7778 mock_topics [topic ] = {}
7879
79- def create_partition (self , topic : str , partitions : int ):
80+ def create_partition (self , topic : str , partitions : int ) -> None :
8081 if not self .is_topic_exist (topic = topic ):
8182 self .create_topic (topic = topic )
8283
@@ -92,7 +93,7 @@ def create_partition(self, topic: str, partitions: int):
9293 else :
9394 raise KafkaException ("can not decrease partition of topic" )
9495
95- def remove_topic (self , topic : str ):
96+ def remove_topic (self , topic : str ) -> None :
9697 if not self .is_topic_exist (topic = topic ):
9798 return
9899
@@ -103,22 +104,22 @@ def remove_topic(self, topic: str):
103104 if topic in offset_key :
104105 offset_store .pop (offset_key )
105106
106- def set_first_offset (self , topic : str , partition : int , value : int ):
107+ def set_first_offset (self , topic : str , partition : int , value : int ) -> None :
107108 offset_store_key = self .get_offset_store_key (topic = topic , partition = partition )
108109 first_offset = self .get_partition_first_offset (topic = topic , partition = partition )
109110 next_offset = self .get_partition_next_offset (topic = topic , partition = partition )
110111
111112 if first_offset < value <= next_offset :
112113 offset_store [offset_store_key ][self .FIRST_OFFSET ] = value
113114
114- def _add_next_offset (self , topic : str , partition : int ):
115+ def _add_next_offset (self , topic : str , partition : int ) -> None :
115116 offset_store_key = self .get_offset_store_key (topic = topic , partition = partition )
116117 offset_store [offset_store_key ][self .NEXT_OFFSET ] += 1
117118
118- def get_offset_store_key (self , topic : str , partition : int ):
119+ def get_offset_store_key (self , topic : str , partition : int ) -> str :
119120 return f"{ topic } *{ partition } "
120121
121- def produce (self , message : Message , topic : str , partition : int ):
122+ def produce (self , message : Message , topic : str , partition : int ) -> None :
122123 if not topic :
123124 return
124125
@@ -174,15 +175,15 @@ def number_of_message_in_topic(self, topic: str) -> int:
174175
175176 return count_of_messages
176177
177- def clear_topic_messages (self , topic : str ):
178+ def clear_topic_messages (self , topic : str ) -> None :
178179 for partition in self .partition_list (topic = topic ):
179180 self .clear_partition_messages (topic = topic , partition = partition )
180181
181182 @staticmethod
182- def clear_partition_messages (topic : str , partition : int ):
183+ def clear_partition_messages (topic : str , partition : int ) -> None :
183184 mock_topics [topic ][partition ] = []
184185
185- def reset_offset (self , topic : str , strategy : str = "latest" ):
186+ def reset_offset (self , topic : str , strategy : str = "latest" ) -> None :
186187 for partition in self .partition_list (topic = topic ):
187188 key = self .get_offset_store_key (topic , partition )
188189
@@ -195,6 +196,6 @@ def reset_offset(self, topic: str, strategy: str = "latest"):
195196 offset_store [key ][self .FIRST_OFFSET ] = 0
196197
197198 @staticmethod
198- def fresh ():
199+ def fresh () -> None :
199200 mock_topics .clear ()
200201 offset_store .clear ()
0 commit comments