-
Bug
-
Resolution: Unresolved
-
Major
-
3.0.2.Final
-
None
-
False
-
None
-
False
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
{ "class": "io.debezium.connector.mongodb.MongoDbConnector", "type": "source", "version": "3.0.2.Final" }What is the connector ERROR?
{ "name": "source_os_inter_scenario_avro_mongodb_for_bigdata", "connector": { "state": "RUNNING", "worker_id": "10.110.24.235:8823" }, "tasks": [ { "id": 0, "state": "FAILED", "worker_id": "10.110.24.235:8823", "trace": "org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:67)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:90)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:37)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:324)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:203)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:143)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:842)\nCaused by: io.debezium.DebeziumException: Error while attempting to Reading change stream\n\tat io.debezium.connector.mongodb.connection.MongoDbConnections.lambda$eventSourcingErrorHandler$1(MongoDbConnections.java:53)\n\tat io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:111)\n\tat io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:88)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:84)\n\t... 9 more\nCaused by: io.debezium.DebeziumException: Unable to fetch change stream events\n\tat io.debezium.connector.mongodb.events.BufferingChangeStreamCursor$EventFetcher.poll(BufferingChangeStreamCursor.java:235)\n\tat io.debezium.connector.mongodb.events.BufferingChangeStreamCursor.pollWithDelay(BufferingChangeStreamCursor.java:405)\n\tat io.debezium.connector.mongodb.events.BufferingChangeStreamCursor.tryNext(BufferingChangeStreamCursor.java:374)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.readChangeStream(MongoDbStreamingChangeEventSource.java:107)\n\tat io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$execute$0(MongoDbStreamingChangeEventSource.java:85)\n\tat io.debezium.connector.mongodb.connection.MongoDbConnection.lambda$execute$0(MongoDbConnection.java:89)\n\tat io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:105)\n\t... 11 more\nCaused by: com.mongodb.MongoCommandException: Command failed with error 280 (ChangeStreamFatalError): 'PlanExecutor error during aggregation :: caused by :: Attempted to resume from a split event fragment, but the event in the resumed stream was not large enough to be split' on server hn-fornix-production-logistic-mongodb-linelv-01:27017. The full response is {\"errorLabels\": [\"NonResumableChangeStreamError\"], \"ok\": 0.0, \"errmsg\": \"PlanExecutor error during aggregation :: caused by :: Attempted to resume from a split event fragment, but the event in the resumed stream was not large enough to be split\", \"code\": 280, \"codeName\": \"ChangeStreamFatalError\", \"$clusterTime\": {\"clusterTime\": {\"$timestamp\": {\"t\": 1739753426, \"i\": 124}}, \"signature\": {\"hash\": {\"$binary\": {\"base64\": \"PalUqlSqunvD9Dp2u74sRs0EaBQ=\", \"subType\": \"00\"}}, \"keyId\": 7427895936975634460}}, \"operationTime\": {\"$timestamp\": {\"t\": 1739753426, \"i\": 124}}}\n\tat com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205)\n\tat com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:454)\n\tat com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:372)\n\tat com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114)\n\tat com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:765)\n\tat com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:76)\n\tat com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:209)\n\tat com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:115)\n\tat com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:83)\n\tat com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:74)\n\tat com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:299)\n\tat com.mongodb.internal.operation.SyncOperationHelper.createReadCommandAndExecute(SyncOperationHelper.java:273)\n\tat com.mongodb.internal.operation.SyncOperationHelper.lambda$executeRetryableRead$3(SyncOperationHelper.java:191)\n\tat com.mongodb.internal.operation.SyncOperationHelper.lambda$withSourceAndConnection$0(SyncOperationHelper.java:127)\n\tat com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:152)\n\tat com.mongodb.internal.operation.SyncOperationHelper.lambda$withSourceAndConnection$1(SyncOperationHelper.java:126)\n\tat com.mongodb.internal.operation.SyncOperationHelper.withSuppliedResource(SyncOperationHelper.java:152)\n\tat com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection(SyncOperationHelper.java:125)\n\tat com.mongodb.internal.operation.SyncOperationHelper.lambda$executeRetryableRead$4(SyncOperationHelper.java:189)\n\tat com.mongodb.internal.operation.SyncOperationHelper.lambda$decorateReadWithRetries$12(SyncOperationHelper.java:292)\n\tat com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67)\n\tat com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:194)\n\tat com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead(SyncOperationHelper.java:176)\n\tat com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:193)\n\tat com.mongodb.internal.operation.ChangeStreamOperation.lambda$execute$0(ChangeStreamOperation.java:187)\n\tat com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource(SyncOperationHelper.java:99)\n\tat com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:185)\n\tat com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:54)\n\tat com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:153)\n\tat com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:212)\n\tat com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:187)\n\tat io.debezium.connector.mongodb.events.BufferingChangeStreamCursor$EventFetcher.run(BufferingChangeStreamCursor.java:260)\n\t... 5 more\n" } ], "type": "source" }
What is the connector configuration?
{ "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "producer.override.buffer.memory": "100814346", "producer.override.compression.type": "lz4", "collection.include.list": "os_inter_scenario.*", "topic.creation.default.partitions": "5", "connect.max.attempts": "4", "mongodb.password": "${file:/etc/kafka-connect/config/secret.properties:mongodb.mongodb_os_inter_scenario.password}", "mongodb.connection.string": "mongodb://10.110.98.197:27017,10.110.98.198:27017,10.110.98.199:27017/os_inter_scenario?replicaSet=inter_scenario", "tasks.max": "1", "max.queue.size.in.bytes": "100814346", "capture.mode": "change_streams_update_full", "mongodb.user": "${file:/etc/kafka-connect/config/secret.properties:mongodb.mongodb_os_inter_scenario.username}", "heartbeat.interval.ms": "300000", "topic.creation.default.retention.ms": "259200000", "tombstones.on.delete": "false", "topic.prefix": "os_inter_scenario_avro_mongodb_bigdata", "producer.override.max.request.size": "100814346", "errors.max.retries": "5", "name": "source_os_inter_scenario_avro_mongodb_for_bigdata", "topic.creation.default.replication.factor": "3", "cursor.oversize.handling.mode": "split", "database.include.list": "os_inter_scenario", "snapshot.mode": "never" }
What is the captured database version and mode of deployment?
on-premises mongo db 7.0.2
Do you have the connector logs, ideally from start till finish?