@@ -34,244 +34,5 @@ composer require flix-tech/avro-serde-php "~1.4"
3434## Credits
3535This library was inspired by [ jobcloud/php-kafka-lib] ( https://github.com/jobcloud/php-kafka-lib ) :heart_eyes :
3636
37- ## Usage
38-
39- ### Simple Producer
40- ``` php
41- <?php
42-
43- use PhpKafka\Message\KafkaProducerMessage;
44- use PhpKafka\Producer\KafkaProducerBuilder;
45-
46- $producer = KafkaProducerBuilder::create()
47- ->withAdditionalBroker('localhost:9092')
48- ->build();
49-
50- $message = KafkaProducerMessage::create('test-topic', 0)
51- ->withKey('asdf-asdf-asfd-asdf')
52- ->withBody('some test message payload')
53- ->withHeaders([ 'key' => 'value' ]);
54-
55- $producer->produce($message);
56-
57- // Shutdown producer, flush messages that are in queue. Give up after 20s
58- $result = $producer->flush(20000);
59- ```
60-
61- ### Transactional producer
62- ``` php
63- <?php
64-
65- use PhpKafka\Message\KafkaProducerMessage;
66- use PhpKafka\Producer\KafkaProducerBuilder;
67- use PhpKafka\Exception\KafkaProducerTransactionRetryException;
68- use PhpKafka\Exception\KafkaProducerTransactionAbortException;
69- use PhpKafka\Exception\KafkaProducerTransactionFatalException;
70-
71- $producer = KafkaProducerBuilder::create()
72- ->withAdditionalBroker('localhost:9092')
73- ->build();
74-
75- $message = KafkaProducerMessage::create('test-topic', 0)
76- ->withKey('asdf-asdf-asfd-asdf')
77- ->withBody('some test message payload')
78- ->withHeaders([ 'key' => 'value' ]);
79- try {
80- $producer->beginTransaction(10000);
81- $producer->produce($message);
82- $producer->commitTransaction(10000);
83- } catch (KafkaProducerTransactionRetryException $e) {
84- // something went wrong but you can retry the failed call (either beginTransaction or commitTransaction)
85- } catch (KafkaProducerTransactionAbortException $e) {
86- // you need to call $producer->abortTransaction(10000); and try again
87- } catch (KafkaProducerTransactionFatalException $e) {
88- // something went very wrong, re-create your producer, otherwise you could jeopardize the idempotency guarantees
89- }
90-
91- // Shutdown producer, flush messages that are in queue. Give up after 20s
92- $result = $producer->flush(20000);
93- ```
94-
95- ### Avro Producer
96- To create an avro prodcuer add the avro encoder.
97-
98- ``` php
99- <?php
100-
101- use FlixTech\AvroSerializer\Objects\RecordSerializer;
102- use PhpKafka\Message\KafkaProducerMessage;
103- use PhpKafka\Message\Encoder\AvroEncoder;
104- use PhpKafka\Message\Registry\AvroSchemaRegistry;
105- use PhpKafka\Producer\KafkaProducerBuilder;
106- use PhpKafka\Message\KafkaAvroSchema;
107- use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
108- use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
109- use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
110- use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
111- use GuzzleHttp\Client;
112-
113- $cachedRegistry = new CachedRegistry(
114- new BlockingRegistry(
115- new PromisingRegistry(
116- new Client(['base_uri' => 'kafka-schema-registry:9081'])
117- )
118- ),
119- new AvroObjectCacheAdapter()
120- );
121-
122- $registry = new AvroSchemaRegistry($cachedRegistry);
123- $recordSerializer = new RecordSerializer($cachedRegistry);
124-
125- //if no version is defined, latest version will be used
126- //if no schema definition is defined, the appropriate version will be fetched form the registry
127- $registry->addBodySchemaMappingForTopic(
128- 'test-topic',
129- new KafkaAvroSchema('bodySchemaName' /*, int $version, AvroSchema $definition */)
130- );
131- $registry->addKeySchemaMappingForTopic(
132- 'test-topic',
133- new KafkaAvroSchema('keySchemaName' /*, int $version, AvroSchema $definition */)
134- );
135-
136- // if you are only encoding key or value, you can pass that mode as additional third argument
137- // per default both key and body will get encoded
138- $encoder = new AvroEncoder($registry, $recordSerializer /*, AvroEncoderInterface::ENCODE_BODY */);
139-
140- $producer = KafkaProducerBuilder::create()
141- ->withAdditionalBroker('kafka:9092')
142- ->withEncoder($encoder)
143- ->build();
144-
145- $schemaName = 'testSchema';
146- $version = 1;
147- $message = KafkaProducerMessage::create('test-topic', 0)
148- ->withKey('asdf-asdf-asfd-asdf')
149- ->withBody(['name' => 'someName'])
150- ->withHeaders([ 'key' => 'value' ]);
151-
152- $producer->produce($message);
153-
154- // Shutdown producer, flush messages that are in queue. Give up after 20s
155- $result = $producer->flush(20000);
156- ```
157-
158- ** NOTE:** To improve producer latency you can install the ` pcntl ` extension.
159- The php-simple-kafka-lib already has code in place, similarly described here:
160- https://github.com/arnaud-lb/php-rdkafka#performance--low-latency-settings
161-
162- ### Simple Consumer
163-
164- ``` php
165- <?php
166-
167- use PhpKafka\Consumer\KafkaConsumerBuilder;
168- use PhpKafka\Exception\KafkaConsumerConsumeException;
169- use PhpKafka\Exception\KafkaConsumerEndOfPartitionException;
170- use PhpKafka\Exception\KafkaConsumerTimeoutException;
171-
172- $consumer = KafkaConsumerBuilder::create()
173- ->withAdditionalConfig(
174- [
175- 'compression.codec' => 'lz4',
176- 'auto.commit.interval.ms' => 500
177- ]
178- )
179- ->withAdditionalBroker('kafka:9092')
180- ->withConsumerGroup('testGroup')
181- ->withAdditionalSubscription('test-topic')
182- ->build();
183-
184- $consumer->subscribe();
185-
186- while (true) {
187- try {
188- $message = $consumer->consume();
189- // your business logic
190- $consumer->commit($message);
191- } catch (KafkaConsumerTimeoutException $e) {
192- //no messages were read in a given time
193- } catch (KafkaConsumerEndOfPartitionException $e) {
194- //only occurs if enable.partition.eof is true (default: false)
195- } catch (KafkaConsumerConsumeException $e) {
196- // Failed
197- }
198- }
199- ```
200-
201- ### Avro Consumer
202- To create an avro consumer add the avro decoder.
203-
204- ``` php
205- <?php
206-
207- use FlixTech\AvroSerializer\Objects\RecordSerializer;
208- use PhpKafka\Messaging\Kafka\Consumer\KafkaConsumerBuilder;
209- use PhpKafka\Exception\KafkaConsumerConsumeException;
210- use PhpKafka\Exception\KafkaConsumerEndOfPartitionException;
211- use PhpKafka\Exception\KafkaConsumerTimeoutException;
212- use PhpKafka\Message\Decoder\AvroDecoder;
213- use PhpKafka\Message\KafkaAvroSchema;
214- use PhpKafka\Message\Registry\AvroSchemaRegistry;
215- use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
216- use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
217- use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
218- use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
219- use GuzzleHttp\Client;
220-
221- $cachedRegistry = new CachedRegistry(
222- new BlockingRegistry(
223- new PromisingRegistry(
224- new Client(['base_uri' => 'kafka-schema-registry:9081'])
225- )
226- ),
227- new AvroObjectCacheAdapter()
228- );
229-
230- $registry = new AvroSchemaRegistry($cachedRegistry);
231- $recordSerializer = new RecordSerializer($cachedRegistry);
232-
233- //if no version is defined, latest version will be used
234- //if no schema definition is defined, the appropriate version will be fetched form the registry
235- $registry->addBodySchemaMappingForTopic(
236- 'test-topic',
237- new KafkaAvroSchema('bodySchema' , 9 /* , AvroSchema $definition */)
238- );
239- $registry->addKeySchemaMappingForTopic(
240- 'test-topic',
241- new KafkaAvroSchema('keySchema' , 9 /* , AvroSchema $definition */)
242- );
243-
244- // if you are only decoding key or value, you can pass that mode as additional third argument
245- // per default both key and body will get decoded
246- $decoder = new AvroDecoder($registry, $recordSerializer /*, AvroDecoderInterface::DECODE_BODY */);
247-
248- $consumer = KafkaConsumerBuilder::create()
249- ->withAdditionalConfig(
250- [
251- 'compression.codec' => 'lz4',
252- 'auto.commit.interval.ms' => 500
253- ]
254- )
255- ->withDecoder($decoder)
256- ->withAdditionalBroker('kafka:9092')
257- ->withConsumerGroup('testGroup')
258- ->withAdditionalSubscription('test-topic')
259- ->build();
260-
261- $consumer->subscribe();
262-
263- while (true) {
264- try {
265- $message = $consumer->consume();
266- // your business logic
267- $consumer->commit($message);
268- } catch (KafkaConsumerTimeoutException $e) {
269- //no messages were read in a given time
270- } catch (KafkaConsumerEndOfPartitionException $e) {
271- //only occurs if enable.partition.eof is true (default: false)
272- } catch (KafkaConsumerConsumeException $e) {
273- // Failed
274- }
275- }
276- ```
277-
37+ ## Examples
38+ Examples can be found [ here] ( https://github.com/php-kafka/php-kafka-examples/tree/main/src/ext-php-simple-kafka-client/php-simple-kafka-lib )
0 commit comments