From be2ecafea5774e9b6b3f10ce54d1aa31d268b384 Mon Sep 17 00:00:00 2001 From: "Aaron M. Ucko" Date: Fri, 6 Oct 2017 17:22:03 -0400 Subject: New upstream version 7.30.20170918+ds --- download-pubmed | 18 +++ xtract.go | 469 ++++++++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 427 insertions(+), 60 deletions(-) diff --git a/download-pubmed b/download-pubmed index 76694f1..d0e3169 100755 --- a/download-pubmed +++ b/download-pubmed @@ -2,15 +2,33 @@ useasp=`has-asp` +filter() { + while read fl + do + base=${fl%.xml.gz} + if [ -f "$fl" ] + then + continue + fi + if [ -f "$base.snt" ] + then + continue + fi + echo "$fl" + done +} + download() { if [ "$useasp" == "true" ] then asp-ls "pubmed/$1" | grep -v ".md5" | grep "xml.gz" | + filter | asp-cp "pubmed/$1" else ftp-ls ftp.ncbi.nlm.nih.gov "pubmed/$1" | grep -v ".md5" | grep "xml.gz" | + filter | ftp-cp ftp.ncbi.nlm.nih.gov "pubmed/$1" fi } diff --git a/xtract.go b/xtract.go index f841a96..10e3210 100644 --- a/xtract.go +++ b/xtract.go @@ -59,6 +59,7 @@ import ( "golang.org/x/text/runes" "golang.org/x/text/transform" "golang.org/x/text/unicode/norm" + "hash/crc32" "html" "io" "io/ioutil" @@ -335,6 +336,7 @@ Local Record Indexing -flag [strict|mixed|none] -gzip Use compression for local XML files + -hash Print UIDs and checksum values to stdout Sample File Download @@ -355,6 +357,11 @@ Human Subset Extraction run-ncbi-converter asn2all -i "$fl" -a t -b -c -O 9606 -f s > ${fl%.aso.gz}.xml done +Deleted PMID File Download + + ftp-cp ftp.ncbi.nlm.nih.gov /pubmed deleted.pmids.gz + gunzip deleted.pmids.gz + PubMed Download download-pubmed baseline updatefiles @@ -363,6 +370,11 @@ PubMed Archive Creation stash-pubmed mixed /Volumes/myssd/Pubmed +PubMed Archive Maintenance + + cat deleted.pmids | + erase-pubmed /Volumes/myssd/Pubmed + PubMed Archive Retrieval cat lycopene.uid | @@ -407,6 +419,15 @@ Reconstruct Release Files done rm -rf uids-??? +Experimental Postings File Creation + + efetch -db pubmed -id 12857958,2981625 -format xml | + xtract -e2index | + xtract -pattern IdxDocument -UID IdxUid \ + -block NORM -pfc "\n" -element "&UID",NORM | + LC_ALL='C' sort -k 2f -k 1n | + xtract -posting "/Volumes/myssd/Postings/NORM" + DISABLE ANTI-VIRUS FILE SCANNING FOR LOCAL ARCHIVES OR MOVE TO TRUSTED FILES DISABLE SPOTLIGHT INDEXING FOR EXTERNAL DISKS CONTAINING LOCAL ARCHIVES @@ -1000,8 +1021,8 @@ Phrase Indexing -pattern PubmedArticle \ -pfx " " -sfx "\n" \ -element MedlineCitation/PMID \ - -clr -rst -tab "\n" \ - -lbl " " \ + -clr -rst -tab "" \ + -lbl " \n" \ -indices ArticleTitle,AbstractText,Keyword \ -clr -lbl " \n" | xtract -pattern IdxDocument -UID IdxUid \ @@ -2273,7 +2294,9 @@ type Tables struct { Match string Attrib string Stash string + Posting string Zipp bool + Hash bool Hd string Tl string DeGloss bool @@ -6572,6 +6595,38 @@ func ProcessHydra(isPipe bool) []string { return acc } +// ENTREZ2INDEX COMMAND GENERATOR + +// ProcessE2Index generates extraction commands to create input for Entrez2Index (undocumented) +func ProcessE2Index(isPipe bool) []string { + + var acc []string + + if isPipe { + acc = append(acc, "-head", "", "-tail", "") + acc = append(acc, "-hd", " \\n", "-tl", " ") + acc = append(acc, "-pattern", "PubmedArticle") + acc = append(acc, "-pfx", " ", "-sfx", "\\n") + acc = append(acc, "-element", "MedlineCitation/PMID") + acc = append(acc, "-clr", "-rst", "-tab", "") + acc = append(acc, "-lbl", " \\n") + acc = append(acc, "-indices", "ArticleTitle,AbstractText,Keyword") + acc = append(acc, "-clr", "-lbl", " \\n") + } else { + acc = append(acc, "-head", "\"\"", "-tail", "\"\"") + acc = append(acc, "-hd", "\" \\n\"", "-tl", "\" \"") + acc = append(acc, "-pattern", "PubmedArticle") + acc = append(acc, "-pfx", "\" \"", "-sfx", "\"\\n\"") + acc = append(acc, "-element", "MedlineCitation/PMID") + acc = append(acc, "-clr", "-rst", "-tab", "\"\"") + acc = append(acc, "-lbl", "\" \\n\"") + acc = append(acc, "-indices", "ArticleTitle,AbstractText,Keyword") + acc = append(acc, "-clr", "-lbl", "\" \\n\"") + } + + return acc +} + // COLLECT AND FORMAT REQUESTED XML VALUES // ParseAttributes is only run if attribute values are requested in element statements @@ -7154,7 +7209,7 @@ func ProcessClause(curr *Node, stages []*Step, mask, prev, pfx, sfx, sep, def st processElement(func(str string) { if str != "" { words := strings.FieldsFunc(str, func(c rune) bool { - return !unicode.IsLetter(c) && !unicode.IsNumber(c) + return !unicode.IsLetter(c) && !unicode.IsDigit(c) }) for _, item := range words { item = strings.ToLower(item) @@ -7169,7 +7224,7 @@ func ProcessClause(curr *Node, stages []*Step, mask, prev, pfx, sfx, sep, def st processElement(func(str string) { if str != "" { words := strings.FieldsFunc(str, func(c rune) bool { - return !unicode.IsLetter(c) && !unicode.IsNumber(c) + return !unicode.IsLetter(c) && !unicode.IsDigit(c) }) if len(words) > 1 { past := "" @@ -7248,33 +7303,28 @@ func ProcessClause(curr *Node, stages []*Step, mask, prev, pfx, sfx, sep, def st str = DoHTMLReplace(str) } - // break terms at spaces, allowing hyphenated words + // break terms at spaces, allowing hyphenated terms terms := strings.Fields(str) - past := "" for _, item := range terms { item = html.UnescapeString(item) // allow parentheses in chemical formula item = TrimPunctuation(item) // skip numbers if IsAllNumeric(item) { - past = "" continue } - // index term and adjacent term pairs - past = addToIndex(item, past) + // index single term + addToIndex(item, "") } // break words at non-alphanumeric punctuation words := strings.FieldsFunc(str, func(c rune) bool { - return !unicode.IsLetter(c) && !unicode.IsNumber(c) + return !unicode.IsLetter(c) && !unicode.IsDigit(c) }) - past = "" + past := "" for _, item := range words { - item = html.UnescapeString(item) - // trim unescaped punctuation - item = TrimPunctuation(item) - // skip numbers - if IsAllNumeric(item) { + // skip anything starting with a digit + if len(item) < 1 || unicode.IsDigit(rune(item[0])) { past = "" continue } @@ -8434,10 +8484,10 @@ func ProcessQuery(Text, parent string, index int, cmds *Block, tbls *Tables, act // Stream tokens to obtain value of single index element // parseIndex recursive definition - var parseIndex func(string, string, string) (string, bool) + var parseIndex func(string, string, string) string // parse XML tags looking for trie index element - parseIndex = func(strt, attr, prnt string) (string, bool) { + parseIndex = func(strt, attr, prnt string) string { // check for attribute index match if attr != "" && tbls.Attrib != "" && strings.Contains(attr, tbls.Attrib) { @@ -8446,7 +8496,7 @@ func ProcessQuery(Text, parent string, index int, cmds *Block, tbls *Tables, act attribs := ParseAttributes(attr) for i := 0; i < len(attribs)-1; i += 2 { if attribs[i] == tbls.Attrib { - return attribs[i+1], true + return attribs[i+1] } } } @@ -8462,26 +8512,26 @@ func ProcessQuery(Text, parent string, index int, cmds *Block, tbls *Tables, act switch tag { case STARTTAG: - id, ok := parseIndex(name, attr, strt) - if !ok || id != "" { - return id, ok + id := parseIndex(name, attr, strt) + if id != "" { + return id } case SELFTAG: case STOPTAG: // break recursion - return "", true + return "" case CONTENTTAG: // check for content index match if strt == tbls.Match || tbls.Match == "" { if tbls.Parent == "" || prnt == tbls.Parent { - return name, true + return name } } default: } } - return "", false + return "" } // just return indexed identifier @@ -8508,13 +8558,7 @@ func ProcessQuery(Text, parent string, index int, cmds *Block, tbls *Tables, act tag, name, attr, idx = nextToken(Idx) } - id, ok := parseIndex(name, attr, parent) - - if !ok || id == "" { - return "" - } - - return id + return parseIndex(name, attr, parent) } // ProcessQuery @@ -8533,8 +8577,8 @@ func ProcessQuery(Text, parent string, index int, cmds *Block, tbls *Tables, act // FUNCTION TO CONVERT IDENTIFIER TO DIRECTORY PATH FOR LOCAL FILE ARCHIVE -// MakeTrie allows a short prefix of letters with an optional underscore, and splits the remainder into character pairs -func MakeTrie(str string, arry [132]rune) string { +// MakeArchiveTrie allows a short prefix of letters with an optional underscore, and splits the remainder into character pairs +func MakeArchiveTrie(str string, arry [132]rune) string { if len(str) > 64 { return "" @@ -8599,6 +8643,36 @@ func MakeTrie(str string, arry [132]rune) string { return strings.ToUpper(string(arry[:i])) } +// FUNCTION TO CONVERT TERM TO DIRECTORY PATH FOR POSTINGS FILE STORAGE + +// MakePostingsTrie splits a string into characters, separated by path delimiting slashes +func MakePostingsTrie(str string, arry [516]rune) string { + + if len(str) > 256 { + return "" + } + + i := 0 + doSlash := false + for _, ch := range str { + if doSlash { + arry[i] = '/' + i++ + } + if ch == ' ' { + ch = '_' + } + if !unicode.IsLetter(ch) && !unicode.IsDigit(ch) { + ch = '_' + } + arry[i] = ch + i++ + doSlash = true + } + + return strings.ToLower(string(arry[:i])) +} + // UNSHUFFLER USES HEAP TO RESTORE OUTPUT OF MULTIPLE CONSUMERS TO ORIGINAL RECORD ORDER type Extract struct { @@ -9059,12 +9133,12 @@ func CreateStashers(tbls *Tables, inp <-chan Extract) <-chan string { } // stashRecord saves individual XML record to archive file accessed by trie - stashRecord := func(text, id string, index int) { + stashRecord := func(text, id string, index int) string { var arry [132]rune - trie := MakeTrie(id, arry) + trie := MakeArchiveTrie(id, arry) if trie == "" { - return + return "" } attempts := 5 @@ -9083,11 +9157,11 @@ func CreateStashers(tbls *Tables, inp <-chan Extract) <-chan string { if attempts < 1 { // cannot get lock after several attempts fmt.Fprintf(os.Stderr, "\nERROR: Unable to save '%s'\n", id) - return + return "" } case BAIL: // later version is being saved, skip this one - return + return "" default: } } @@ -9097,7 +9171,7 @@ func CreateStashers(tbls *Tables, inp <-chan Extract) <-chan string { dpath := path.Join(tbls.Stash, trie) if dpath == "" { - return + return "" } _, err := os.Stat(dpath) if err != nil && os.IsNotExist(err) { @@ -9105,23 +9179,33 @@ func CreateStashers(tbls *Tables, inp <-chan Extract) <-chan string { } if err != nil { fmt.Println(err.Error()) - return + return "" } fpath := path.Join(dpath, id+sfx) if fpath == "" { - return + return "" } // overwrites and truncates existing file fl, err := os.Create(fpath) if err != nil { fmt.Println(err.Error()) - return + return "" } // remove leading spaces on each line str := trimLeft(text) + res := "" + + if tbls.Hash { + // calculate hash code for verification table + hsh := crc32.NewIEEE() + hsh.Write([]byte(str)) + val := hsh.Sum32() + res = strconv.FormatUint(uint64(val), 10) + } + if tbls.Zipp { zpr, err := gzip.NewWriterLevel(fl, gzip.BestCompression) @@ -9153,6 +9237,8 @@ func CreateStashers(tbls *Tables, inp <-chan Extract) <-chan string { fmt.Println(err.Error()) } fl.Close() + + return res } // xmlStasher reads from channel and calls stashRecord @@ -9162,9 +9248,14 @@ func CreateStashers(tbls *Tables, inp <-chan Extract) <-chan string { for ext := range inp { - stashRecord(ext.Text, ext.Ident, ext.Index) + hsh := stashRecord(ext.Text, ext.Ident, ext.Index) + res := ext.Ident + if tbls.Hash { + res += "\t" + hsh + } + res += "\n" - out <- ext.Ident + out <- res } } @@ -9216,7 +9307,7 @@ func CreateFetchers(tbls *Tables, inp <-chan Extract) <-chan Extract { file := ext.Text var arry [132]rune - trie := MakeTrie(file, arry) + trie := MakeArchiveTrie(file, arry) if trie == "" { continue } @@ -9289,6 +9380,160 @@ func CreateFetchers(tbls *Tables, inp <-chan Extract) <-chan Extract { return out } +func CreateTermListReader(in io.Reader, tbls *Tables) <-chan Extract { + + if in == nil || tbls == nil { + return nil + } + + out := make(chan Extract, tbls.ChanDepth) + if out == nil { + fmt.Fprintf(os.Stderr, "\nERROR: Unable to create term list reader channel\n") + os.Exit(1) + } + + // termReader reads uids and terms from input stream and sends through channel + termReader := func(in io.Reader, out chan<- Extract) { + + // close channel when all records have been processed + defer close(out) + + var buffer bytes.Buffer + + uid := "" + term := "" + prev := "" + count := 0 + + scanr := bufio.NewScanner(in) + + idx := 0 + for scanr.Scan() { + + // read lines of uid and term groups + line := scanr.Text() + idx++ + + uid, term = SplitInTwoAt(line, "\t", LEFT) + + if prev != "" && prev != term { + + str := buffer.String() + out <- Extract{idx, prev, str} + + buffer.Reset() + count = 0 + } + + buffer.WriteString(uid) + buffer.WriteString("\n") + count++ + + prev = term + } + + if count > 0 { + + str := buffer.String() + out <- Extract{idx, term, str} + + buffer.Reset() + } + } + + // launch single term reader goroutine + go termReader(in, out) + + return out +} + +func CreatePosters(tbls *Tables, inp <-chan Extract) <-chan string { + + if tbls == nil || inp == nil { + return nil + } + + out := make(chan string, tbls.ChanDepth) + if out == nil { + fmt.Fprintf(os.Stderr, "\nERROR: Unable to create poster channel\n") + os.Exit(1) + } + + // savePosting writes individual postings list to file accessed by radix trie + savePosting := func(text, id string, index int) { + + var arry [516]rune + trie := MakePostingsTrie(id, arry) + if trie == "" { + return + } + + dpath := path.Join(tbls.Posting, trie) + if dpath == "" { + return + } + _, err := os.Stat(dpath) + if err != nil && os.IsNotExist(err) { + err = os.MkdirAll(dpath, os.ModePerm) + } + if err != nil { + fmt.Println(err.Error()) + return + } + fpath := path.Join(dpath, "uids.txt") + if fpath == "" { + return + } + + // appends if file exists, otherwise creates + fl, err := os.OpenFile(fpath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) + if err != nil { + fmt.Println(err.Error()) + return + } + + fl.WriteString(text) + if !strings.HasSuffix(text, "\n") { + fl.WriteString("\n") + } + + err = fl.Sync() + if err != nil { + fmt.Println(err.Error()) + } + fl.Close() + } + + // xmlPoster reads from channel and calls savePosting + xmlPoster := func(wg *sync.WaitGroup, inp <-chan Extract, out chan<- string) { + + defer wg.Done() + + for ext := range inp { + + savePosting(ext.Text, ext.Ident, ext.Index) + + out <- ext.Ident + } + } + + var wg sync.WaitGroup + + // launch multiple poster goroutines + for i := 0; i < tbls.NumServe; i++ { + wg.Add(1) + go xmlPoster(&wg, inp, out) + } + + // launch separate anonymous goroutine to wait until all posters are done + go func() { + wg.Wait() + close(out) + }() + + return out +} + // MAIN FUNCTION // e.g., xtract -pattern PubmedArticle -element MedlineCitation/PMID -block Author -sep " " -element Initials,LastName @@ -9366,9 +9611,15 @@ func main() { // path for local data indexed as trie stsh := "" + // path for postings files indexed as trie + pstg := "" + // use gzip compression on local data files zipp := false + // print UIDs and hash values + hshv := false + // convert UIDs to directory trie trei := false @@ -9461,12 +9712,21 @@ func main() { // local directory path for indexing case "-archive", "-stash": if len(args) < 2 { - fmt.Fprintf(os.Stderr, "\nERROR: Data path is missing\n") + fmt.Fprintf(os.Stderr, "\nERROR: Archive path is missing\n") os.Exit(1) } stsh = args[1] // skip past first of two arguments args = args[1:] + // local directory path for postings files (undocumented) + case "-posting", "-postings": + if len(args) < 2 { + fmt.Fprintf(os.Stderr, "\nERROR: Posting path is missing\n") + os.Exit(1) + } + pstg = args[1] + // skip past first of two arguments + args = args[1:] // file with selected indexes for removing duplicates case "-phrase": if len(args) < 2 { @@ -9478,6 +9738,8 @@ func main() { args = args[1:] case "-gzip": zipp = true + case "-hash": + hshv = true case "-trie", "-tries": trei = true // data cleanup flags @@ -9632,6 +9894,11 @@ func main() { if goGc >= 100 { fmt.Fprintf(os.Stderr, "Gogc %d\n", goGc) } + fi, err := os.Stdin.Stat() + if err == nil { + mode := fi.Mode().String() + fmt.Fprintf(os.Stderr, "Mode %s\n", mode) + } fmt.Fprintf(os.Stderr, "\n") return @@ -9640,7 +9907,7 @@ func main() { // if copying from local files accessed by identifier, add dummy argument to bypass length tests if stsh != "" && indx == "" { args = append(args, "-dummy") - } else if trei || cmpr { + } else if trei || cmpr || pstg != "" { args = append(args, "-dummy") } @@ -9656,6 +9923,18 @@ func main() { } } + // expand -posting ~/ to home directory path + if pstg != "" { + + if pstg[:2] == "~/" { + cur, err := user.Current() + if err == nil { + hom := cur.HomeDir + pstg = strings.Replace(pstg, "~/", hom+"/", 1) + } + } + } + if len(args) < 1 { fmt.Fprintf(os.Stderr, "\nERROR: Insufficient command-line arguments supplied to xtract\n") os.Exit(1) @@ -9725,6 +10004,10 @@ func main() { tbls.Stash = stsh // use compression for local archive files tbls.Zipp = zipp + // generate hash table on stash or fetch + tbls.Hash = hshv + // base location of local postings directory + tbls.Posting = pstg if indx != "" { @@ -9851,6 +10134,27 @@ func main() { args = hydra } + // EXPERIMENTAL ENTREZ2INDEX COMMAND GENERATOR + + // -e2index shortcut for experimental indexing code (undocumented) + if args[0] == "-e2index" { + + res := ProcessE2Index(isPipe || usingFile) + + if !isPipe && !usingFile { + // no piped input, so write output instructions + fmt.Printf("xtract") + for _, str := range res { + fmt.Printf(" %s", str) + } + fmt.Printf("\n") + return + } + + // data in pipe, so replace arguments, execute dynamically + args = res + } + // CONFIRM INPUT DATA AVAILABILITY AFTER RUNNING COMMAND GENERATORS if fileName == "" && runtime.GOOS != "windows" { @@ -10025,7 +10329,7 @@ func main() { file := scanr.Text() var arry [132]rune - trie := MakeTrie(file, arry) + trie := MakeArchiveTrie(file, arry) if trie == "" || file == "" { continue } @@ -10042,6 +10346,35 @@ func main() { return } + // CREATE POSTINGS FILES USING TRIE ON TERM CHARACTERS + + // -posting produces postings files (undocumented) + if pstg != "" { + + trml := CreateTermListReader(rdr.Reader, tbls) + pstr := CreatePosters(tbls, trml) + + if trml == nil || pstr == nil { + fmt.Fprintf(os.Stderr, "\nERROR: Unable to create postings generator\n") + os.Exit(1) + } + + // drain output channel + for _ = range pstr { + + recordCount++ + runtime.Gosched() + } + + debug.FreeOSMemory() + + if timr { + printDuration("terms") + } + + return + } + // CHECK FOR MISSING RECORDS IN LOCAL DIRECTORY INDEXED BY TRIE ON IDENTIFIER // -archive plus -missing checks for missing records @@ -10059,7 +10392,7 @@ func main() { file := scanr.Text() var arry [132]rune - trie := MakeTrie(file, arry) + trie := MakeArchiveTrie(file, arry) if trie == "" || file == "" { continue } @@ -10080,7 +10413,7 @@ func main() { _, err = os.Stat(fpath) } if err != nil && os.IsNotExist(err) { - // identifier is missing from local file cache + // record is missing from local file cache os.Stdout.WriteString(file) os.Stdout.WriteString("\n") } @@ -10117,23 +10450,33 @@ func main() { continue } + recordCount++ + if hd != "" { os.Stdout.WriteString(hd) os.Stdout.WriteString("\n") } - // send result to output - os.Stdout.WriteString(curr.Text) - if !strings.HasSuffix(curr.Text, "\n") { - os.Stdout.WriteString("\n") + if hshv { + // calculate hash code for verification table + hsh := crc32.NewIEEE() + hsh.Write([]byte(curr.Text)) + val := hsh.Sum32() + res := strconv.FormatUint(uint64(val), 10) + txt := curr.Ident + "\t" + res + "\n" + os.Stdout.WriteString(txt) + } else { + // send result to output + os.Stdout.WriteString(curr.Text) + if !strings.HasSuffix(curr.Text, "\n") { + os.Stdout.WriteString("\n") + } } if tl != "" { os.Stdout.WriteString(tl) os.Stdout.WriteString("\n") } - - recordCount++ } if tail != "" { @@ -10224,7 +10567,7 @@ func main() { } var arry [132]rune - trie := MakeTrie(id, arry) + trie := MakeArchiveTrie(id, arry) if trie == "" { return } @@ -10356,7 +10699,13 @@ func main() { } // drain output channel - for _ = range stsq { + for str := range stsq { + + if hshv { + // print table of UIDs and hash values + os.Stdout.WriteString(str) + } + recordCount++ runtime.Gosched() } @@ -10426,7 +10775,7 @@ func main() { var buffer bytes.Buffer for _, ch := range str { - if unicode.IsLetter(ch) || unicode.IsNumber(ch) { + if unicode.IsLetter(ch) || unicode.IsDigit(ch) { buffer.WriteRune(ch) } else if ch == '<' || ch == '>' { buffer.WriteRune(' ') -- cgit v1.2.3