Proper use of parallelPipeline

Dear all,
I’m using the parallelPipeline to handle parallel processing of a workflow.
The utility automagically map a series of inputSpecs into one or more outputSpecs.
However, within my device, I need to snapshot the output of one process to the correct outputSpec. What is the proper way to achieve this?
If I have a one to one correspondence within inputSpec and outputSpec, I can do it by getting the data header of the inputs and setting it as an output spec, e.g.:

dh = it.o2DataHeader();
pc.outputs().snapshot(o2::framework::Output{header::gDataOriginMID, "DECODED", dh->subSpecification, o2::framework::Lifetime::Timeframe}, data);

But what’s the proper way to do it in case I have multiple inputs to one output? Should I pass a kind of map to my task and then use it to determine the output from the input, or is there a better way?

Thanks in advance for further hints,
best regards,

Are you sure you don’t need to use parallel / mergeInputs?

That said, have you tried to see if simply using --pipeline <device>:<N> is not sufficient for you? It will parallelise based on the timeframe id / timestamp a given device.

Hi @eulisse ,
I had a quick look. If I understand correctly parallel allows for a more flexible configuration of the processor spec w.r.t. parallelPipleine (de facto, I have to define the amendCallback, right? While this is already done in parallelPipeline). The use of mergeInputs is instead a little bit more obscure to me.
Still, my issue remains since, inside the processor, I need to know the subSpec of the output since I need to specify it in pc.outputs().snapshot

Your second solution is interesting: is there a doc/working example?

However, my use case is that I get the output from 2 CRUs, each one reading a side of the detector. Those sides can be processed independently up to clusterization. So it would make sense to have two parallel workflows, each one processing the data of 1 CRU, instead of paralellelizing by timeframe.


Working example for 2 is:

o2-testworkflows-diamond-workflow --pipeline B:2

Device “B” will be split in B_t0 and B_t1, one for odd, the other one even timeframes.

For 1. you need to get the DeviceSpec const from the context and from there navigate to the outputs[0].matcher. If you show me some code I can be more precise.

Ciao @eulisse ,
ok, thanks for the info.
For what concerns the code, I use it here:

At the moment I can either launch a separate device per GBT link:

and in this case I send the output to the same SubSpec as the input, so I managed to figure out what to do.
But the real thing would be to process 16 GBT links and send the output to one spec:

In this case, I pass the output spec as a parameter to my device…but that won’t work if I parallelise since I can amend the output callback but not the task itself…