I'm on v.8.18 and trying to make my first transform script which should correlate a start and end event from a source index and then calculate the process time in ms as the time difference between start and end events and copy a few fields from the end event as well as set @timestamp to matching end event and then store such into a new data stream. intent is to be able to visualize various statistics of process time.
So I've created a index template referencing a desired ILM policy, precreated a data stream of such a template. Created a transform script to correlated start end end events over a common FlowID field, attempting to copy such fields into destination ingesting going through a pipeline to flatten fields and calculate the process time in milliseconds before ingesting into to destination data stream, only transform script seems to fail to ingest data with this error:
[2025-05-11T10:17:46,760][INFO ][o.e.x.t.t.TransformTask ] [node246] [pjp_camel_process_time] updating state for transform to [{"task_state":"started","indexer_state":"stopped","checkpoint":0,"progress":{"docs_indexed":0,"docs_processed":0},"should_stop_at_checkpoint":false,"auth_state":{"timestamp":1746951444646,"status":"green"}}].
[2025-05-11T10:17:46,835][INFO ][o.e.x.t.t.TransformPersistentTasksExecutor] [node246] [pjp_camel_process_time] successfully completed and scheduled task in node operation
[2025-05-11T10:17:46,952][ERROR][o.e.x.t.t.TransformTask ] [node246] [pjp_camel_process_time] transform has failed; experienced: [Failed to index documents into destination index due to permanent error: [org.elasticsearch.xpack.transform.transforms.BulkIndexingException: Bulk index experienced [500] failures and at least 1 irrecoverable [org.elasticsearch.script.ScriptException: runtime error; java.lang.IllegalArgumentException: dynamic getter [java.lang.String, value] not found]. Other failures:
[IngestProcessorException] message [org.elasticsearch.ingest.IngestProcessorException: org.elasticsearch.script.ScriptException: runtime error]; org.elasticsearch.script.ScriptException: runtime error; java.lang.IllegalArgumentException: dynamic getter [java.lang.String, value] not found]].
org.elasticsearch.xpack.transform.transforms.BulkIndexingException: Bulk index experienced [500] failures and at least 1 irrecoverable [org.elasticsearch.script.ScriptException: runtime error; java.lang.IllegalArgumentException: dynamic getter [java.lang.String, value] not found]. Other failures:
[IngestProcessorException] message [org.elasticsearch.ingest.IngestProcessorException: org.elasticsearch.script.ScriptException: runtime error]
at org.elasticsearch.xpack.transform.transforms.ClientTransformIndexer.handleBulkResponse(ClientTransformIndexer.java:244) ~[?:?]
at org.elasticsearch.xpack.transform.transforms.ClientTransformIndexer.lambda$doNextBulk$1(ClientTransformIndexer.java:179) ~[?:?]
at org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:257) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:33) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:203) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.tasks.TaskManager$1.onResponse(TaskManager.java:197) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.action.ActionListenerImplementations$RunBeforeActionListener.onResponse(ActionListenerImplementations.java:336) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:33) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.action.ActionListenerImplementations$MappedActionListener.onResponse(ActionListenerImplementations.java:97) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.action.ActionListenerImplementations$RunBeforeActionListener.onResponse(ActionListenerImplementations.java:336) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.action.ActionListenerImplementations$MappedActionListener.onResponse(ActionListenerImplementations.java:97) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.action.bulk.TransportAbstractBulkAction.lambda$processBulkIndexIngestRequest$6(TransportAbstractBulkAction.java:308) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.ingest.IngestService$1.lambda$doRun$0(IngestService.java:775) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.core.AbstractRefCounted$1.closeInternal(AbstractRefCounted.java:125) ~[elasticsearch-core-8.18.0.jar:?]
at org.elasticsearch.core.AbstractRefCounted.decRef(AbstractRefCounted.java:77) ~[elasticsearch-core-8.18.0.jar:?]
at org.elasticsearch.action.support.RefCountingRunnable.close(RefCountingRunnable.java:113) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.ingest.IngestService$1.doRun(IngestService.java:847) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:34) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:1044) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27) ~[elasticsearch-8.18.0.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1095) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:619) ~[?:?]
at java.lang.Thread.run(Thread.java:1447) ~[?:?]
Caused by: org.elasticsearch.script.ScriptException: runtime error
at org.elasticsearch.painless.PainlessScript.convertToScriptException(PainlessScript.java:91) ~[?:?]
at org.elasticsearch.painless.PainlessScript$Script.execute( ...:1) ~[?:?]
at org.elasticsearch.ingest.common.ScriptProcessor.execute(ScriptProcessor.java:79) ~[?:?]
at org.elasticsearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:171) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:146) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.ingest.Pipeline.execute(Pipeline.java:130) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.ingest.IngestDocument.executePipeline(IngestDocument.java:795) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.ingest.IngestService.executePipeline(IngestService.java:1118) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.ingest.IngestService.executePipelines(IngestService.java:970) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.ingest.IngestService$1.doRun(IngestService.java:842) ~[elasticsearch-8.18.0.jar:?]
... 7 more
Caused by: java.lang.IllegalArgumentException: dynamic getter [java.lang.String, value] not found
at org.elasticsearch.painless.Def.lookupGetter(Def.java:455) ~[?:?]
at org.elasticsearch.painless.DefBootstrap$PIC.lookup(DefBootstrap.java:172) ~[?:?]
at org.elasticsearch.painless.DefBootstrap$PIC.fallback(DefBootstrap.java:233) ~[?:?]
at org.elasticsearch.painless.PainlessScript$Script.execute( ...:107) ~[?:?]
at org.elasticsearch.ingest.common.ScriptProcessor.execute(ScriptProcessor.java:79) ~[?:?]
at org.elasticsearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:171) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.ingest.CompoundProcessor.execute(CompoundProcessor.java:146) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.ingest.Pipeline.execute(Pipeline.java:130) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.ingest.IngestDocument.executePipeline(IngestDocument.java:795) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.ingest.IngestService.executePipeline(IngestService.java:1118) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.ingest.IngestService.executePipelines(IngestService.java:970) ~[elasticsearch-8.18.0.jar:?]
at org.elasticsearch.ingest.IngestService$1.doRun(IngestService.java:842) ~[elasticsearch-8.18.0.jar:?]
... 7 more
It seems to suggest that the error is originating from my transform ingest pipeline, only I'm not sure precisely where or why, so hints appreciated.
Here's my ingest pipeline and the transform script:
PUT _ingest/pipeline/pjp_camel_process_time
{
"description": "Flattens transform output and enriches with duration and camel.req fields",
"processors": [
{
"script": {
"description": "Flatten start and end timestamps and copy camel.req",
"lang": "painless",
"source": """
// Get start_time and end_time from nested structure
def startTs = ctx.start_time?.ts?.value;
def endTs = ctx.end_time?.ts?.value;
if (startTs != null && endTs != null) {
ctx.start = startTs;
ctx.end = endTs;
// Set @timestamp to end time
ctx['@timestamp'] = endTs;
// Calculate duration in ms
ctx.process_time_ms = ZonedDateTime.parse(endTs).toInstant().toEpochMilli() - ZonedDateTime.parse(startTs).toInstant().toEpochMilli();
} else {
// If missing timestamps, drop the document
ctx.op = 'noop';
return;
}
// Copy camel_req object from end_time
if (ctx.end_time?.camel_req != null) {
ctx.camel = ['req': ctx.end_time.camel_req];
}
"""
}
},
{
"remove": {
"field": ["start_time", "end_time"],
"ignore_missing": true
}
}
]
}
PUT _transform/pjp_camel_process_time
{
"source": {
"index": "pjp_camel_logs",
"query": {
"bool": {
"filter": [
{
"terms": {
"camel.req.event": ["ExchangeCreatedEvent", "ExchangeCompletedEvent"]
}
}
]
}
}
},
"pivot": {
"group_by": {
"camel.req.flowid": {
"terms": {
"field": "camel.req.flowid"
}
}
},
"aggregations": {
"start_time": {
"filter": {
"term": {
"camel.req.event": "ExchangeCreatedEvent"
}
},
"aggregations": {
"ts": {
"min": {
"field": "@timestamp"
}
}
}
},
"end_time": {
"filter": {
"term": {
"camel.req.event": "ExchangeCompletedEvent"
}
},
"aggregations": {
"ts": {
"max": {
"field": "@timestamp"
}
},
"camel_req": {
"scripted_metric": {
"init_script": "state.camel_req = null;",
"map_script": """
if (state.camel_req == null) {
state.camel_req = new HashMap();
}
if (doc.containsKey('camel.req.exchangeid') && doc['camel.req.exchangeid'].size() > 0) {
state.camel_req.exchangeid = doc['camel.req.exchangeid'].value;
}
if (doc.containsKey('camel.req.routeid') && doc['camel.req.routeid'].size() > 0) {
state.camel_req.routeid = doc['camel.req.routeid'].value;
}
if (doc.containsKey('camel.req.event') && doc['camel.req.event'].size() > 0) {
state.camel_req.event = doc['camel.req.event'].value;
}
if (doc.containsKey('camel.req.flowid') && doc['camel.req.flowid'].size() > 0) {
state.camel_req.flowid = doc['camel.req.flowid'].value;
}
""",
"combine_script": "return state.camel_req;",
"reduce_script": "return states.stream().filter(s -> s != null).findFirst().orElse(null);"
}
}
}
}
}
},
"dest": {
"index": "pjp_camel_process_time-write",
"pipeline": "pjp_camel_process_time"
},
"settings": {
"max_page_search_size": 500
}
}
Let me know if you got any hints or need more info to get further insight in my problem. TIA