Downloads Documentation Community Contribute Demo






Show Sidebar
Login | Register

root/openmrs/trunk/src/api/org/openmrs/hl7/HL7InQueueProcessor.java

Revision 4095, 21.3 kB (checked in by catullus, 4 months ago)

Set the svn:eol-style property to CRLF.

  • Property svn:eol-style set to CRLF
Line 
1 /**
2  * The contents of this file are subject to the OpenMRS Public License
3  * Version 1.0 (the "License"); you may not use this file except in
4  * compliance with the License. You may obtain a copy of the License at
5  * http://license.openmrs.org
6  *
7  * Software distributed under the License is distributed on an "AS IS"
8  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
9  * License for the specific language governing rights and limitations
10  * under the License.
11  *
12  * Copyright (C) OpenMRS, LLC.  All Rights Reserved.
13  */
14 package org.openmrs.hl7;
15
16 import org.apache.commons.logging.Log;
17 import org.apache.commons.logging.LogFactory;
18 import org.openmrs.api.context.Context;
19 import org.springframework.transaction.annotation.Transactional;
20
21 import ca.uhn.hl7v2.HL7Exception;
22
23 /**
24  * Processes message in the HL7 inbound queue. Messages are moved into either
25  * the archive or error table depending on success or failure of the processing.
26  *
27  * You may, however, set a global property that causes the processor to ignore
28  * messages regarding unknown patients from a non-local HL7 source. (i.e. those
29  * messages neither go to the archive or the error table.)
30  *
31  * @version 1.0
32  */
33 @Transactional
34 public class HL7InQueueProcessor /* implements Runnable */{
35
36         private final Log log = LogFactory.getLog(this.getClass());
37
38         private HL7Receiver receiver = new HL7Receiver();
39         private static Boolean isRunning = false; // allow only one running
40
41         // processor per JVM
42
43         /**
44          * Empty constructor (requires context to be set using
45          * <code>setContext(Context)</code> method before any other calls are
46          * made)
47          */
48         public HL7InQueueProcessor() {
49         }
50
51         /**
52          * Process a single queue entry from the inbound HL7 queue
53          *
54          * @param hl7InQueue
55          *            queue entry to be processed
56          */
57         public void processHL7InQueue(HL7InQueue hl7InQueue) {
58
59                 if (log.isDebugEnabled())
60                         log.debug("Processing HL7 inbound queue (id="
61                                         + hl7InQueue.getHL7InQueueId() + ",key="
62                                         + hl7InQueue.getHL7SourceKey() + ")");
63
64                 // Parse the HL7 into an HL7Message or abort with failure
65                 String hl7Message = hl7InQueue.getHL7Data();
66                 try {
67                         // Send the inbound HL7 message to our receiver routine for
68                         // processing
69                         if (log.isDebugEnabled())
70                                 log.debug("Sending HL7 message to HL7 receiver");
71                         receiver.processMessage(hl7Message);
72
73                         // Move HL7 inbound queue entry into the archive before exiting
74                         if (log.isDebugEnabled())
75                                 log.debug("Archiving HL7 inbound queue entry");
76                         HL7InArchive hl7InArchive = new HL7InArchive(hl7InQueue);
77                         Context.getHL7Service().createHL7InArchive(hl7InArchive);
78                         if (log.isDebugEnabled())
79                                 log.debug("Removing HL7 message from inbound queue");
80                         Context.getHL7Service().deleteHL7InQueue(hl7InQueue);
81                 } catch (HL7Exception e) {
82                         boolean skipError = false;
83                         log.error("Unable to process hl7inqueue: " + hl7InQueue.getHL7InQueueId(), e);
84                         log.error("Hl7inqueue source: " + hl7InQueue.getHL7Source());
85                         log.error("hl7_processor.ignore_missing_patient_non_local? " + Context.getAdministrationService().getGlobalProperty("hl7_processor.ignore_missing_patient_non_local", "false"));
86                         if (e.getCause() != null && e.getCause().getMessage().equals("Could not resolve patient")
87                                         && !hl7InQueue.getHL7Source().getName().equals("local")
88                                         && Context.getAdministrationService().getGlobalProperty("hl7_processor.ignore_missing_patient_non_local", "false").equals("true") ) {
89                                 skipError = true;
90                         }
91                         if (!skipError)
92                                 setFatalError(hl7InQueue, "Trouble parsing HL7 message ("
93                                                 + hl7InQueue.getHL7SourceKey() + ")", e);
94                         return;
95                 } catch (Exception e) {
96                         setFatalError(hl7InQueue,
97                                         "Exception while attempting to process HL7 In Queue ("
98                                                         + hl7InQueue.getHL7SourceKey() + ")", e);
99                 }
100
101                 // clean up memory after processing each queue entry (otherwise, the
102                 // memory-intensive process may crash or eat up all our memory)
103                 try {
104                         Context.getHL7Service().garbageCollect();
105                 } catch (Exception e) {
106                         log
107                                         .error(
108                                                         "Exception while performing garbagecollect in hl7 inbound processor",
109                                                         e);
110                 }
111
112         }
113
114         /**
115          * Transform the next pending HL7 inbound queue entry. If there are no
116          * pending items in the queue, this method simply returns quietly.
117          *
118          * @return true if a queue entry was processed, false if queue was empty
119          */
120         public boolean processNextHL7InQueue() {
121                 boolean entryProcessed = false;
122                 HL7Service hl7Service = Context.getHL7Service();
123                 HL7InQueue hl7InQueue;
124                 if ((hl7InQueue = hl7Service.getNextHL7InQueue()) != null) {
125                         processHL7InQueue(hl7InQueue);
126                         entryProcessed = true;
127                 }
128                 return entryProcessed;
129         }
130
131         /**
132          * Convenience method to respond to fatal errors by moving the queue entry
133          * into an error bin prior to aborting
134          */
135         private void setFatalError(HL7InQueue hl7InQueue, String error,
136                         Throwable cause) {
137                 HL7InError hl7InError = new HL7InError(hl7InQueue);
138                 hl7InError.setError(error);
139                 hl7InError.setErrorDetails(cause == null ? "" : cause.getMessage());
140                 Context.getHL7Service().createHL7InError(hl7InError);
141                 Context.getHL7Service().deleteHL7InQueue(hl7InQueue);
142                 log.error(error, cause);
143         }
144
145         /*
146          * Run method for processing all entries in the HL7 inbound queue
147          *
148          * public void run() { try { while (processNextHL7InQueue()) { // loop until
149          * queue is empty } } catch (Exception e) { log.error("Error while
150          * processing HL7 inbound queue", e); } }
151          */
152
153         /**
154          * Starts up a thread to process all existing HL7InQueue entries
155          */
156         public void processHL7InQueue() throws HL7Exception {
157                 synchronized (isRunning) {
158                         if (isRunning) {
159                                 log
160                                                 .warn("HL7 processor aborting (another processor already running)");
161                                 return;
162                         }
163                         isRunning = true;
164                 }
165                 try {
166                         log.debug("Start processing hl7 in queue");
167                         while (processNextHL7InQueue()) {
168                                 // loop until queue is empty
169                         }
170                         log.debug("Done processing hl7 in queue");
171                 }
172                 finally {
173                         isRunning = false;
174                 }
175         }
176
177         /*
178          * private static Hashtable<Context, Thread> threadCache = new Hashtable<Context,
179          * Thread>();
180          *
181          * private static Thread getThreadForContext(Context context) { Thread
182          * thread; if (threadCache.containsKey(context)) thread =
183          * threadCache.get(context); else { thread = new Thread(new
184          * HL7InQueueProcessor(context)); threadCache.put(context, thread); } return
185          * thread; }
186          */
187
188         // /**
189         // * Process a single queue entry from the inbound HL7 queue
190         // *
191         // * @param hl7InQueue
192         // * queue entry to be processed
193         // */
194         // public void processHL7InQueue(HL7InQueue hl7InQueue) {
195         //
196         // HL7Message hl7Message;
197         //
198         // // Parse the HL7 into an HL7Message or abort with failure
199         // String hl7Data = hl7InQueue.getHL7Data();
200         // try {
201         // hl7Message = new HL7Message(hl7Data);
202         // } catch (HL7Exception e) {
203         // setFatalError(hl7InQueue, "Error parsing HL7 message", e);
204         // return;
205         // }
206         //
207         // // Check the HL7 version
208         // if (!hl7Message.getVersion().equals("2.5")) {
209         // setFatalError(hl7InQueue, "Unsupported version (2.5 only)", null);
210         // return;
211         // }
212         //
213         // // Branch based on the type of message
214         // String messageType = hl7Message.getMessageType();
215         // if (messageType.equals("ORU"))
216         // processORU(hl7InQueue, hl7Message);
217         // else {
218         // setFatalError(hl7InQueue, "Message type not supported: \""
219         // + messageType + "\"", null);
220         // return;
221         // }
222         //
223         // // clean up memory after processing each queue entry
224         // Context.getHL7Service().garbageCollect();
225         // }
226         //
227         // /**
228         // * Transform the next pending HL7 inbound queue entry. If there are no
229         // * pending items in the queue, this method simply returns quietly.
230         // *
231         // * @return true if a queue entry was processed, false if queue was empty
232         // */
233         // public boolean processNextHL7InQueue() {
234         // boolean entryProcessed = false;
235         // HL7Service hl7Service = Context.getHL7Service();
236         // HL7InQueue hl7InQueue;
237         // if ((hl7InQueue = hl7Service.getNextHL7InQueue()) != null) {
238         // processHL7InQueue(hl7InQueue);
239         // entryProcessed = true;
240         // }
241         // return entryProcessed;
242         // }
243         //
244         // /**
245         // * Process ORU messages (this will eventually be moved into a separate
246         // class
247         // * as we expand to processing more message types ... or be replaced by
248         // part
249         // * of an HL7 library)
250         // *
251         // * @param hl7InQueue
252         // * inbound queue entry to be processed
253         // * @param hl7Message
254         // * message generated from the queue entry
255         // */
256         // public void processORU(HL7InQueue hl7InQueue, HL7Message hl7Message) {
257         //
258         // // Extract the patient segment
259         // HL7Segment pid = hl7Message.getNextSegment("PID");
260         // if (pid == null) {
261         // setFatalError(hl7InQueue, "Expected PID segment", null);
262         // return;
263         // }
264         //
265         // // Extract the visit segment
266         // HL7Segment pv1 = hl7Message.getNextSegment("PV1");
267         // if (pv1 == null) {
268         // setFatalError(hl7InQueue, "Expected PV1 segment", null);
269         // return;
270         // }
271         //
272         // // Extract the common order segment (contains entry time and enterer)
273         // HL7Segment orc = hl7Message.getNextSegment("ORC");
274         // if (orc == null) {
275         // setFatalError(hl7InQueue, "Expected ORC segment", null);
276         // return;
277         // }
278         //
279         // // Determine the enterer for obs data
280         // User enterer = null;
281         // try {
282         // enterer = getEnterer(orc);
283         // } catch (HL7Exception e) {
284         // setFatalError(hl7InQueue, "Unable to determine the data enterer", e);
285         // return;
286         // }
287         //
288         // // Determine the patient from the PID (patient) segment
289         // Patient patient = null;
290         // try {
291         // patient = getPatient(pid);
292         // } catch (HL7Exception e) {
293         // setFatalError(hl7InQueue, "Unable to determine the patient", e);
294         // return;
295         // }
296         //
297         // // Determine the provider from the PV1 (visit) segment
298         // User provider = null;
299         // try {
300         // provider = getProvider(pv1);
301         // } catch (HL7Exception e) {
302         // setFatalError(hl7InQueue, "Unable to determine the provider", e);
303         // return;
304         // }
305         //
306         // // Determine the form used (OpenMRS-specific -- we record the form
307         // // as a specific "profile" in MSH-21)
308         // Form form = null;
309         // try {
310         // form = getForm(hl7Message);
311         // } catch (Exception e) {
312         // setFatalError(hl7InQueue, "Unable to determine OpenMRS form", e);
313         // return;
314         // }
315         //
316         // // Create an encounter for the observations
317         // Encounter encounter = null;
318         // try {
319         // encounter = createEncounter(enterer, form, orc, pid, pv1);
320         // } catch (HL7Exception e) {
321         // setFatalError(hl7InQueue, "Unable to create encounter", e);
322         // return;
323         // }
324         //
325         // // Loop through the observations (grouped in OBRs, there should be
326         // // an OBR segment followed by 0..n OBX with an observation per OBX)
327         // Vector<Error> errors = new Vector<Error>();
328         // HL7Segment obr;
329         // while ((obr = hl7Message.getNextSegment("OBR")) != null) {
330         // HL7Segment obx;
331         // while (hl7Message.hasNextSegment("OBX")) {
332         // obx = hl7Message.getNextSegment();
333         // try {
334         // createObservation(enterer, patient, encounter, provider,
335         // obx);
336         // } catch (HL7Exception e) {
337         // addError(errors, obx.toString(), "OBX" + obx.getField(1)
338         // + " parse error", e.getMessage());
339         // }
340         // }
341         // }
342         // if (errors != null && errors.size() > 0)
343         // recordErrors(hl7InQueue, errors);
344         //
345         // // Move HL7 inbound queue entry into the archive before exiting
346         // HL7InArchive hl7InArchive = new HL7InArchive(hl7InQueue);
347         // Context.getHL7Service().createHL7InArchive(hl7InArchive);
348         // Context.getHL7Service().deleteHL7InQueue(hl7InQueue);
349         // }
350         //
351         // /**
352         // * Creates an encounter
353         // */
354         // private Encounter createEncounter(User enterer, Form form, HL7Segment
355         // orc,
356         // HL7Segment pid, HL7Segment pv1) throws HL7Exception {
357         // Encounter encounter = new Encounter();
358         //
359         // Date encounterDate = HL7Util.parseHL7Date(pv1.getField(44));
360         // Date dateEntered = HL7Util.parseHL7Timestamp(orc.getField(9));
361         // EncounterType encounterType = form.getEncounterType();
362         // Location location = getLocation(pv1);
363         // Patient patient = getPatient(pid);
364         // User provider = getProvider(pv1);
365         //
366         // encounter.setEncounterDatetime(encounterDate);
367         // encounter.setEncounterType(encounterType);
368         // encounter.setForm(form);
369         // encounter.setLocation(location);
370         // encounter.setPatient(patient);
371         // encounter.setProvider(provider);
372         // encounter.setCreator(enterer);
373         // encounter.setDateCreated(dateEntered);
374         // Context.getEncounterService().createEncounter(encounter);
375         //
376         // if (encounter == null || encounter.getEncounterId() == null
377         // || encounter.getEncounterId() == 0) {
378         // throw new HL7Exception("Invalid encounter");
379         // }
380         // return encounter;
381         // }
382         //
383         // /**
384         // * Extracts the data enterer from the ORC segment
385         // */
386         // private User getEnterer(HL7Segment orc) throws HL7Exception {
387         // String[] entererComponents = orc.getComponents(10);
388         // Integer entererId = null;
389         // try {
390         // entererId = Context.getHL7Service().resolveUserId(entererComponents);
391         // } catch (HL7Exception ex) {
392         // throw new HL7Exception("Error retrieving User from ORC.orderer");
393         // }
394         // if (entererId == null) {
395         // throw new HL7Exception("Could not find enterer specified in ORC
396         // segment");
397         // } else {
398         // return Context.getUserService().getUser(entererId);
399         // }
400         // }
401         //
402         // /**
403         // * Extract the patient from the PID segment
404         // */
405         // private Patient getPatient(HL7Segment pid) throws HL7Exception {
406         // Integer ptId;
407         // try {
408         // ptId = Context.getHL7Service().resolvePatientId(pid);
409         // } catch (HL7Exception ex) {
410         // throw new HL7Exception("Error retrieving patient from PID segment", ex);
411         // }
412         // if (ptId == null) {
413         // throw new HL7Exception("Could not find patient specified in PID
414         // segment");
415         // } else {
416         // return Context.getPatientService().getPatient(ptId);
417         // }
418         // }
419         //
420         // /**
421         // * Extracts the form used to record the observations from the MSH segment
422         // * (we store the formId as a MSH profile ID in MSH-21)
423         // */
424         // private Form getForm(HL7Message hl7Message) throws HL7Exception {
425         // String hl7FormId = hl7Message.getProfileId().split(
426         // "\\" + hl7Message.getComponentSeparator())[0];
427         // Integer formId = null;
428         // Form form = null;
429         // try {
430         // formId = Integer.parseInt(hl7FormId);
431         // } catch (NumberFormatException e) {
432         // throw new HL7Exception("Invalid form ID '" + hl7FormId + "'");
433         // }
434         // if (formId == null || formId == 0) {
435         // throw new HL7Exception("Unable to parse OpenMRS form ID '"
436         // + hl7FormId + "'");
437         // }
438         // try {
439         // form = Context.getFormService().getForm(formId);
440         // } catch (Exception e) {
441         // throw new HL7Exception("Error retrieving OpenMRS form " + formId, e);
442         // }
443         // if (form == null || form.getFormId() == null)
444         // throw new HL7Exception("Could not find OpenMRS form " + formId);
445         // return form;
446         // }
447         //
448         // /**
449         // * Extracts the location of the visit from the PV1 segment
450         // */
451         // private Location getLocation(HL7Segment pv1) throws HL7Exception {
452         // String[] locationComponents = pv1.getComponents(3);
453         // Integer locationId = null;
454         // try {
455         // locationId =
456         // Context.getHL7Service().resolveLocationId(locationComponents);
457         // } catch (HL7Exception ex) {
458         // throw new HL7Exception("Error retrieving Location from PV1.'Assigned
459         // Patient Location'", ex);
460         // }
461         // if (locationId == null) {
462         // throw new HL7Exception("Could not find Assigned Patient Location
463         // specified in PV1 segment");
464         // } else {
465         // return Context.getEncounterService().getLocation(locationId);
466         // }
467         // }
468         //
469         // /**
470         // * Extracts the provider from the PV1 (visit) segment
471         // */
472         // private User getProvider(HL7Segment pv1) throws HL7Exception {
473         // String[] providerComponents = pv1.getComponents(7);
474         // Integer providerId = null;
475         // try {
476         // providerId = Context.getHL7Service().resolveUserId(providerComponents);
477         // } catch (HL7Exception ex) {
478         // throw new HL7Exception("Error retrieving User from PV1.provider");
479         // }
480         // if (providerId == null) {
481         // throw new HL7Exception("Could not find provider specified in PV1
482         // segment");
483         // } else {
484         // return Context.getUserService().getUser(providerId);
485         // }
486         // }
487         //
488         // /**
489         // * Creates an observation. If the observed conceptId matches the keyword
490         // for
491         // * proposed concepts, then a ConceptProposal is generated instead.
492         // */
493         // private void createObservation(User enterer, Patient patient,
494         // Encounter encounter, User provider, HL7Segment obx)
495         // throws HL7Exception {
496         // try {
497         // String hl7Datatype = obx.getField(2);
498         // Integer conceptId = Integer.parseInt(obx.getComponent(3, 1));
499         // Concept concept = Context.getConceptService().getConcept(conceptId);
500         // String subId = obx.getField(4);
501         // String value = obx.getField(5);
502         // String[] valueComponents = obx.getComponents(5);
503         // String dateTimeRaw = obx.getField(14);
504         // Date dateTime;
505         // if (dateTimeRaw.length() < 1)
506         // if (encounter != null)
507         // dateTime = encounter.getEncounterDatetime();
508         // else
509         // dateTime = new Date();
510         // else
511         // dateTime = HL7Util.parseHL7Timestamp(obx.getField(14));
512         //
513         // Obs obs = new Obs();
514         // obs.setPatient(patient);
515         // obs.setConcept(concept);
516         // obs.setEncounter(encounter);
517         // obs.setObsDatetime(dateTime);
518         // obs.setLocation(encounter.getLocation());
519         // obs.setCreator(enterer);
520         // if (!"".equals(subId)) {
521         // obs.setObsGroupId(Integer.valueOf(subId));
522         // }
523         // if (encounter != null)
524         // obs.setDateCreated(encounter.getDateCreated());
525         // if (hl7Datatype.equals("NM"))
526         // obs.setValueNumeric(Double.valueOf(value));
527         // else if (hl7Datatype.equals("CWE") || hl7Datatype.equals("CE")) {
528         // if (valueComponents[0]
529         // .equals(OpenmrsConstants.PROPOSED_CONCEPT_IDENTIFIER)) {
530         // proposeConcept(encounter, concept, valueComponents[1],
531         // enterer);
532         // return; // avoid trying to create an obs
533         // } else {
534         // Integer valueConceptId = Integer
535         // .parseInt(valueComponents[0]);
536         // Concept valueConcept = Context.getConceptService()
537         // .getConcept(valueConceptId);
538         // obs.setValueCoded(valueConcept);
539         // }
540         // } else if (hl7Datatype.equals("DT")) {
541         // Date valueDate = HL7Util.parseHL7Date(value);
542         // obs.setValueDatetime(valueDate);
543         // } else if (hl7Datatype.equals("TS")) {
544         // Date valueTimestamp = HL7Util.parseHL7Timestamp(value);
545         // obs.setValueDatetime(valueTimestamp);
546         // } else if (hl7Datatype.equals("TM")) {
547         // Date valueTime = HL7Util.parseHL7Time(value);
548         // obs.setValueDatetime(valueTime);
549         // } else if (hl7Datatype.equals("ST"))
550         // obs.setValueText(value);
551         // else {
552         // // unsupported data type
553         // // TODO: support RP (report), SN (structured numeric)
554         // // do we need to support BIT just in case it slips thru?
555         // }
556         // Context.getObsService().createObs(obs);
557         //
558         // } catch (Exception e) {
559         // throw new HL7Exception(e);
560         // }
561         //
562         // }
563         //
564         // /**
565         // * Generates a ConceptProposal record
566         // */
567         // private void proposeConcept(Encounter encounter, Concept concept,
568         // String originalText, User enterer) {
569         // // value is a proposed concept, create a ConceptProposal
570         // // instead of an Obs for this observation
571         // // TODO: at this point if componentSeparator (^) is in text,
572         // // we'll only use the text before that delimiter!
573         // ConceptProposal conceptProposal = new ConceptProposal();
574         // conceptProposal.setOriginalText(originalText);
575         // conceptProposal.setState(OpenmrsConstants.CONCEPT_PROPOSAL_UNMAPPED);
576         // conceptProposal.setEncounter(encounter);
577         // conceptProposal.setObsConcept(concept);
578         // Context.getConceptService().proposeConcept(conceptProposal);
579         // }
580         //
581         // /**
582         // * Convenience method to respond to fatal errors by moving the queue entry
583         // * into an error bin prior to aborting
584         // */
585         // private void setFatalError(HL7InQueue hl7InQueue, String error,
586         // Throwable cause) {
587         // HL7InError hl7InError = new HL7InError(hl7InQueue);
588         // hl7InError.setError(error);
589         // hl7InError.setErrorDetails(cause == null ? "" : cause.getMessage());
590         // Context.getHL7Service().createHL7InError(hl7InError);
591         // Context.getHL7Service().deleteHL7InQueue(hl7InQueue);
592         // log.error(error, cause);
593         // }
594         //
595         // /**
596         // * Convenience method for tracking errors while processing observations
597         // * (non-fatal errors)
598         // */
599         // private void addError(Vector<Error> errors, String hl7Data, String error,
600         // String errorDetails) {
601         // errors.add(new Error(hl7Data, error, errorDetails));
602         // }
603         //
604         // /**
605         // * Record (non-fatal) errors that occurred while processing observtions
606         // */
607         // private void recordErrors(HL7InQueue hl7InQueue, Vector<Error> errors) {
608         // HL7InError hl7InError = new HL7InError();
609         // String hl7Data = "";
610         // String error = "";
611         // String errorDetails = "";
612         // for (Error err : errors) {
613         // hl7Data += err.hl7Data + "\r";
614         // error += err.error + "\n";
615         // errorDetails += err.errorDetails + "\n";
616         // }
617         // hl7InError.setHL7Source(hl7InQueue.getHL7Source());
618         // hl7InError.setHL7SourceKey(hl7InQueue.getHL7SourceKey());
619         // hl7InError.setHL7Data(hl7Data);
620         // hl7InError.setError(error);
621         // hl7InError.setErrorDetails(errorDetails);
622         // Context.getHL7Service().createHL7InError(hl7InError);
623         // }
624         //
625         // /**
626         // * Private inner class used to track errors while processing observations
627         // */
628         // private class Error {
629         // String hl7Data;
630         // String error;
631         // String errorDetails;
632         //
633         // Error(String hl7Data, String error, String errorDetails) {
634         // this.hl7Data = hl7Data;
635         // this.error = error;
636         // this.errorDetails = errorDetails;
637         // }
638         // }
639 }
Note: See TracBrowser for help on using the browser.