SNAP Library 6.0, Developer Reference  2020-12-09 16:24:20
SNAP, a general purpose, high performance system for analysis and manipulation of large networks
table.cpp
Go to the documentation of this file.
2  if (Left != NULL) { Left->GetVariables(Variables); }
3  if (Right != NULL) { Right->GetVariables(Variables); }
4  if (Op == NOP) {
5  if (Atom.Lvar != "" ) { Variables.Add(Atom.Lvar); }
6  if (Atom.Rvar != "" ) { Variables.Add(Atom.Rvar); }
7  }
8 }
9 
10 void TPredicate::GetVariables(TStrV& Variables) {
11  Root->GetVariables(Variables);
12 }
13 
15  TPredicateNode* Curr = Root;
16  TPredicateNode* Prev = NULL;
17  while (!(Curr == NULL && Prev == Root)) {
18  // going down the tree
19  if (Prev == NULL || Prev == Curr->Parent) {
20  // left child exists and was not yet evaluated
21  if (Curr->Left != NULL) {
22  Prev = Curr;
23  Curr = Curr->Left;
24  } else if (Curr->Right != NULL) {
25  Prev = Curr;
26  Curr = Curr->Right;
27  } else {
28  Curr->Result = EvalAtomicPredicate(Curr->Atom);
29  Prev = Curr;
30  Curr = Curr->Parent;
31  }
32  } else if (Prev == Curr->Left) {
33  // going back up through left (first) child
34  switch (Curr->Op) {
35  case NOT: {
36  Assert(Curr->Right == NULL);
37  Curr->Result = !(Prev->Result);
38  Prev = Curr;
39  Curr = Curr->Parent;
40  break;
41  }
42  case AND: {
43  Assert(Curr->Right != NULL);
44  if (!Prev->Result) {
45  Curr->Result = false;
46  Prev = Curr;
47  Curr = Curr->Parent;
48  } else {
49  Prev = Curr;
50  Curr = Curr->Right;
51  }
52  break;
53  }
54  case OR: {
55  Assert(Curr->Right != NULL);
56  if (Prev->Result) {
57  Curr->Result = true;
58  Prev = Curr;
59  Curr = Curr->Parent;
60  } else {
61  Prev = Curr;
62  Curr = Curr->Right;
63  }
64  break;
65  }
66  case NOP: {
67  break;
68  }
69  }
70  } else {
71  // going back up the tree from right (second) child
72  Assert(Prev == Curr->Right);
73  switch (Curr->Op) {
74  case NOT: {
75  Assert(Curr->Left == NULL);
76  Curr->Result = !(Prev->Result);
77  break;
78  }
79  case AND: {
80  Assert(Curr->Left != NULL);
81  Assert(Curr->Left->Result);
82  Curr->Result = Prev->Result;
83  break;
84  }
85  case OR: {
86  Assert(Curr->Left != NULL);
87  Assert(!Curr->Left->Result);
88  Curr->Result = Prev->Result;
89  break;
90  }
91  case NOP: {
92  break;
93  }
94  }
95  Prev = Curr;
96  Curr = Curr->Parent;
97  }
98  }
99  return Root->Result;
100 }
101 
103  switch (Atom.Type) {
104  case atInt: {
105  if (Atom.IsConst) {
106  return EvalAtom<TInt>(IntVars.GetDat(Atom.Lvar), Atom.IntConst, Atom.Compare);
107  }
108  return EvalAtom<TInt>(IntVars.GetDat(Atom.Lvar), IntVars.GetDat(Atom.Rvar), Atom.Compare);
109  }
110  case atFlt: {
111  if (Atom.IsConst) {
112  return EvalAtom<TFlt>(FltVars.GetDat(Atom.Lvar), Atom.FltConst, Atom.Compare);
113  }
114  return EvalAtom<TFlt>(FltVars.GetDat(Atom.Lvar), FltVars.GetDat(Atom.Rvar), Atom.Compare);
115  }
116  case atStr: {
117  if (Atom.IsConst) {
118  return EvalAtom<TStr>(StrVars.GetDat(Atom.Lvar), Atom.StrConst, Atom.Compare);
119  }
120  return EvalAtom<TStr>(StrVars.GetDat(Atom.Lvar), StrVars.GetDat(Atom.Rvar), Atom.Compare);
121  }
122  }
123  return false;
124 }
125 
126 TInt const TTable::Last = -1;
127 TInt const TTable::Invalid = -2;
128 
129 TInt TTable::UseMP = 1;
130 
132  return this->Next();
133 }
134 
137  //Assert(CurrRowIdx != TTable::Invalid);
138  return *this;
139 }
140 
141 bool TRowIterator::operator < (const TRowIterator& RowI) const{
142  if (CurrRowIdx == TTable::Last) { return false; }
143  if (RowI.CurrRowIdx == TTable::Last) { return true; }
144  return CurrRowIdx < RowI.CurrRowIdx;
145 }
146 
147 bool TRowIterator::operator == (const TRowIterator& RowI) const {
148  return CurrRowIdx == RowI.CurrRowIdx;
149 }
150 
152  return CurrRowIdx;
153 }
154 // We do not check column type in the iterator.
156  return Table->IntCols[ColIdx][CurrRowIdx];
157 }
158 
160  return Table->FltCols[ColIdx][CurrRowIdx];
161 }
162 
164  return Table->GetStrValIdx(ColIdx, CurrRowIdx);
165 }
166 
167 TInt TRowIterator::GetIntAttr(const TStr& Col) const {
168  TInt ColIdx = Table->GetColIdx(Col);
169  return Table->IntCols[ColIdx][CurrRowIdx];
170 }
171 
172 TFlt TRowIterator::GetFltAttr(const TStr& Col) const {
173  TInt ColIdx = Table->GetColIdx(Col);
174  return Table->FltCols[ColIdx][CurrRowIdx];
175 }
176 
177 TStr TRowIterator::GetStrAttr(const TStr& Col) const {
178  return Table->GetStrVal(Col, CurrRowIdx);
179 }
180 
182  TInt ColIdx = Table->GetColIdx(Col);
183  return Table->StrColMaps[ColIdx][CurrRowIdx];
184 }
185 
187  return Table->StrColMaps[ColIdx][CurrRowIdx];
188 }
189 
191  TBool Result;
192  switch (Val.GetType()) {
193  case atInt:
194  Result = TPredicate::EvalAtom(GetIntAttr(ColIdx), Val.GetInt(), Cmp);
195  break;
196  case atFlt:
197  Result = TPredicate::EvalAtom(GetFltAttr(ColIdx), Val.GetFlt(), Cmp);
198  break;
199  case atStr:
200  Result = TPredicate::EvalStrAtom(GetStrAttr(ColIdx), Val.GetStr(), Cmp);
201  break;
202  default:
203  Result = TBool(false);
204  }
205  return Result;
206 }
207 
209  TBool Result;
210  //printf("string compare\n");
211  Result = TPredicate::EvalStrAtom(GetStrAttr(ColIdx), Val, Cmp);
212  return Result;
213 }
214 
216  CurrRowIdx(RowIdx), Table(TablePtr), Start(RowIdx == TablePtr->FirstValidRow) {}
217 
219  return this->Next();
220 }
221 
224  Start = false;
225  Assert(CurrRowIdx != TTable::Invalid);
226  return *this;
227 }
228 
230  if (CurrRowIdx == TTable::Last) { return false; }
231  if (RowI.CurrRowIdx == TTable::Last) { return true; }
232  return CurrRowIdx < RowI.CurrRowIdx;
233 }
234 
236  return CurrRowIdx == RowI.CurrRowIdx;
237 }
238 
240  return CurrRowIdx;
241 }
242 
244  return (Start ? Table->FirstValidRow : Table->Next[CurrRowIdx]);
245 }
246 
247 // We do not check column type in the iterator.
249  return Table->IntCols[ColIdx][GetNextRowIdx()];
250 }
251 
253  return Table->FltCols[ColIdx][GetNextRowIdx()];
254 }
255 
257  return Table->GetStrValIdx(ColIdx, GetNextRowIdx());
258 }
259 
261  TInt ColIdx = Table->GetColIdx(Col);
262  return Table->IntCols[ColIdx][GetNextRowIdx()];
263 }
264 
266  TInt ColIdx = Table->GetColIdx(Col);
267  return Table->FltCols[ColIdx][GetNextRowIdx()];
268 }
269 
271  return Table->GetStrVal(Col, GetNextRowIdx());
272 }
273 
275  return CurrRowIdx == Table->FirstValidRow;
276 }
277 
280 }
281 
283  TBool Result;
284  switch (Val.GetType()) {
285  case atInt:
286  Result = TPredicate::EvalAtom(GetNextIntAttr(ColIdx), Val.GetInt(), Cmp);
287  break;
288  case atFlt:
289  Result = TPredicate::EvalAtom(GetNextFltAttr(ColIdx), Val.GetFlt(), Cmp);
290  break;
291  case atStr:
292  Result = TPredicate::EvalStrAtom(GetNextStrAttr(ColIdx), Val.GetStr(), Cmp);
293  break;
294  default:
295  Result = TBool(false);
296  }
297  return Result;
298 }
299 
300 // Better not use default constructor as it leads to a memory leak.
301 // - OR - implement a destructor.
302 TTable::TTable(): Context(new TTableContext), NumRows(0), NumValidRows(0),
303  FirstValidRow(0), LastValidRow(-1) {}
304 
305 TTable::TTable(TTableContext* Context): Context(Context), NumRows(0),
306  NumValidRows(0), FirstValidRow(0), LastValidRow(-1) {}
307 
308 TTable::TTable(const Schema& TableSchema, TTableContext* Context): Context(Context),
309  NumRows(0), NumValidRows(0), FirstValidRow(0), LastValidRow(-1), IsNextDirty(0) {
310  TInt IntColCnt = 0;
311  TInt FltColCnt = 0;
312  TInt StrColCnt = 0;
313  for (TInt i = 0; i < TableSchema.Len(); i++) {
314  TStr ColName = TableSchema[i].Val1;
315  TAttrType ColType = TableSchema[i].Val2;
316  AddSchemaCol(ColName, ColType);
317  switch (ColType) {
318  case atInt:
319  AddColType(ColName, atInt, IntColCnt);
320  IntColCnt++;
321  break;
322  case atFlt:
323  AddColType(ColName, atFlt, FltColCnt);
324  FltColCnt++;
325  break;
326  case atStr:
327  AddColType(ColName, atStr, StrColCnt);
328  StrColCnt++;
329  break;
330  }
331  }
332  IntCols = TVec<TIntV>(IntColCnt);
333  FltCols = TVec<TFltV>(FltColCnt);
334  StrColMaps = TVec<TIntV>(StrColCnt);
335 }
336 
338  ColTypeMap.Clr();
339  Sch.Clr();
340  for (THash<TStr,TPair<TInt,TInt> >::TIter it = ColTypeIntMap.BegI(); it < ColTypeIntMap.EndI(); it++) {
341  TPair<TInt,TInt> dat = it.GetDat();
342  switch (dat.GetVal1()) {
343  case 0:
344  AddColType(it.GetKey(), atInt, dat.GetVal2());
345  AddSchemaCol(it.GetKey(), atInt);
346  break;
347  case 1:
348  AddColType(it.GetKey(), atFlt, dat.GetVal2());
349  AddSchemaCol(it.GetKey(), atFlt);
350  break;
351  case 2:
352  AddColType(it.GetKey(), atStr, dat.GetVal2());
353  AddSchemaCol(it.GetKey(), atStr);
354  break;
355  }
356  }
357  IsNextDirty = 0;
358 }
359 
360 void TTable::LoadTableShM(TShMIn& ShMIn, TTableContext* ContextTable) {
361  Context = ContextTable;
362  NumRows = TInt(ShMIn);
363  NumValidRows = TInt(ShMIn);
364  FirstValidRow = TInt(ShMIn);
365  LastValidRow = TInt(ShMIn);
366  Next.LoadShM(ShMIn);
367 
368  TLoadVecInit Fn;
369  IntCols.LoadShM(ShMIn, Fn);
370  FltCols.Load(ShMIn);
371  StrColMaps.LoadShM(ShMIn, Fn);
372  THash<TStr,TPair<TInt,TInt> > ColTypeIntMap;
373  ColTypeIntMap.LoadShM(ShMIn);
374 
375  GenerateColTypeMap(ColTypeIntMap);
376 }
377 
378 TTable::TTable(TSIn& SIn, TTableContext* Context): Context(Context), NumRows(SIn),
379  NumValidRows(SIn), FirstValidRow(SIn), LastValidRow(SIn), Next(SIn), IntCols(SIn),
380  FltCols(SIn), StrColMaps(SIn) {
381  THash<TStr,TPair<TInt,TInt> > ColTypeIntMap(SIn);
382  GenerateColTypeMap(ColTypeIntMap);
383 }
384 
385 TTable::TTable(const TIntIntH& H, const TStr& Col1, const TStr& Col2,
386  TTableContext* Context, const TBool IsStrKeys) : Context(Context), NumRows(H.Len()),
387  NumValidRows(H.Len()), FirstValidRow(0), LastValidRow(H.Len()-1) {
388  TAttrType KeyType = IsStrKeys ? atStr : atInt;
389  AddSchemaCol(Col1, KeyType);
390  AddSchemaCol(Col2, atInt);
391  AddColType(Col1, KeyType, 0);
392  AddColType(Col2, atInt, 1);
393  if (IsStrKeys) {
394  StrColMaps = TVec<TIntV>(1);
395  IntCols = TVec<TIntV>(1);
396  H.GetKeyV(StrColMaps[0]);
397  H.GetDatV(IntCols[0]);
398  } else {
399  IntCols = TVec<TIntV>(2);
400  H.GetKeyV(IntCols[0]);
401  H.GetDatV(IntCols[1]);
402  }
403  Next = TIntV(NumRows);
404  for (TInt i = 0; i < NumRows; i++) {
405  Next[i] = i+1;
406  }
407  Next[NumRows-1] = Last;
408  IsNextDirty = 0;
409  InitIds();
410 }
411 
412 TTable::TTable(const TIntFltH& H, const TStr& Col1, const TStr& Col2,
413  TTableContext* Context, const TBool IsStrKeys) : Context(Context),
414  NumRows(H.Len()), NumValidRows(H.Len()), FirstValidRow(0), LastValidRow(H.Len()-1) {
415  TAttrType KeyType = IsStrKeys ? atStr : atInt;
416  AddSchemaCol(Col1, KeyType);
417  AddSchemaCol(Col2, atFlt);
418  AddColType(Col1, KeyType, 0);
419  AddColType(Col2, atFlt, 0);
420  if (IsStrKeys) {
421  StrColMaps = TVec<TIntV>(1);
422  H.GetKeyV(StrColMaps[0]);
423  } else {
424  IntCols = TVec<TIntV>(1);
425  H.GetKeyV(IntCols[0]);
426  }
427  FltCols = TVec<TFltV>(1);
428  H.GetDatV(FltCols[0]);
429  Next = TIntV(NumRows);
430  for (TInt i = 0; i < NumRows; i++) {
431  Next[i] = i+1;
432  }
433  Next[NumRows-1] = Last;
434  IsNextDirty = 0;
435  InitIds();
436 }
437 
438 TTable::TTable(const TTable& Table, const TIntV& RowIDs) : Context(Table.Context),
439  Sch(Table.Sch), SrcCol(Table.SrcCol), DstCol(Table.DstCol), EdgeAttrV(Table.EdgeAttrV),
440  SrcNodeAttrV(Table.SrcNodeAttrV), DstNodeAttrV(Table.DstNodeAttrV),
441  CommonNodeAttrs(Table.CommonNodeAttrs) {
442  ColTypeMap = Table.ColTypeMap;
443  IntCols = TVec<TIntV>(Table.IntCols.Len());
444  FltCols = TVec<TFltV>(Table.FltCols.Len());
446  FirstValidRow = 0;
447  LastValidRow = -1;
448  NumRows = 0;
449  NumValidRows = 0;
450  AddSelectedRows(Table, RowIDs);
451  IsNextDirty = 0;
452  InitIds();
453 }
454 
455 void TTable::GetSchema(const TStr& InFNm, Schema& S, const char& Separator) {
456  // Determine Attr Type
457  // Assume that the data is tab separated
458  TSsParser Ss(InFNm, '\t', false, false, false);
459  TInt rowsToPeek = 1000;
460  TInt currRow = 0;
461  TInt lastComment = 0;
462  while (Ss.Next()) {
463  if (Ss.IsCmt()) {
464  lastComment += 1;
465  }
466  else break;
467  }
468  if (Ss.Eof()) {TExcept::Throw("No Data to determine attribute types!");}
469  TInt numCols = Ss.GetFlds();
470  TVec<TAttrType> colAttrV(numCols);
471  colAttrV.PutAll(atInt);
472  while (true) {
473  for (TInt i = 0; i < numCols; i++) {
474  if (Ss.IsInt(i)) {
475  }
476  else if (Ss.IsFlt(i)) {
477  colAttrV[i] = atFlt;
478  }
479  else {
480  colAttrV[i] = atStr;
481  }
482  }
483  currRow++;
484  if (currRow > rowsToPeek || Ss.Eof()) break;
485  Ss.Next();
486  }
487  // Default Separator is tab
488  TSsParser SsNames(InFNm, Separator, false, false, false);
489  for (int i = 0; i < lastComment; i++) { SsNames.Next();}
490  TVec<TStr> attrV;
491  TStr first(SsNames[0]);
492  int begin = 0;
493  TStr comment('#');
494  if (first != comment) {
495  for (int i = 1; i < first.Len(); i++){
496  if (first[i] != ' ') { begin = i; break;}
497  }
498  attrV.Add(first.GetSubStr(begin));
499  }
500  for (int i = 1; i < SsNames.GetFlds(); i++) {attrV.Add(SsNames[i]);}
501  for (TInt i = 0; i < numCols; i++) {
502  S.Add(TPair<TStr,TAttrType>(attrV[i],colAttrV[i]));
503  }
504 }
505 
506 #ifdef GCC_ATOMIC
507 void TTable::LoadSSPar(PTable& T, const Schema& S, const TStr& InFNm, const TIntV& RelevantCols,
508  const char& Separator, TBool HasTitleLine) {
509  // preloaded necessary variables
510  TInt RowLen = T->Sch.Len();
511  TVec<TAttrType> ColTypes = TVec<TAttrType>(RowLen);
512  for (TInt i = 0; i < RowLen; i++) {
513  ColTypes[i] = T->GetSchemaColType(i);
514  }
515 
516  TSsParserMP Ss(InFNm, Separator);
517  Ss.SkipCommentLines();
518 
519  // if title line (i.e. names of the columns) is included as first row in the
520  // input file - use it to validate schema
521  if (HasTitleLine) {
522  Ss.Next();
523  if (S.Len() != Ss.GetFlds()) {
524  printf("%s\n", Ss[0]); TExcept::Throw("Table Schema Mismatch!");
525  }
526  for (TInt i = 0; i < Ss.GetFlds(); i++) {
527  // remove carriage return char
528  TInt L = strlen(Ss[i]);
529  if (Ss[i][L-1] < ' ') { Ss[i][L-1] = 0; }
530  if (NormalizeColName(S[i].Val1) != NormalizeColName(Ss[i])) { TExcept::Throw("Table Schema Mismatch!"); }
531  }
532  }
533 
534  // Divide remaining part of stream into equal sized chunks
535  // Find starting position in stream for each thread
536  int64 Cnt = 0;
537  uint64 Pos = Ss.GetStreamPos();
538  uint64 Len = Ss.GetStreamLen();
539  uint64 Rem = Len - Pos;
540  int NumThreads = omp_get_max_threads();
541 
542  uint64 Delta = Rem / NumThreads;
543  if (Delta < 1) Delta = 1;
544 
545  TVec<uint64> StartIntV(NumThreads);
546  TVec<uint64> LineCountV(NumThreads);
547  TVec<uint64> PrefixSumV(NumThreads);
548 
549  StartIntV[0] = Pos;
550  for (int i = 1; i < NumThreads; i++) {
551  StartIntV[i] = StartIntV[i-1] + Delta;
552  }
553  StartIntV.Add(Len);
554 
555  // Find number of lines handled by each thread
556  omp_set_num_threads(NumThreads);
557  #pragma omp parallel for schedule(dynamic) reduction(+:Cnt)
558  for (int i = 0; i < NumThreads; i++) {
559  LineCountV[i] = Ss.CountNewLinesInRange(StartIntV[i], StartIntV[i+1]);
560  Cnt += LineCountV[i];
561  }
562 
563  // Calculate row index offsets for each thread
564  PrefixSumV[0] = 0;
565  for (int i = 1; i < NumThreads; i++) {
566  PrefixSumV[i] = PrefixSumV[i-1] + LineCountV[i-1];
567  }
568  Ss.SetStreamPos(Pos);
569 
570  // allocate memory for columns
571  TInt IntColIdx = 0;
572  TInt FltColIdx = 0;
573  for (TInt i = 0; i < RowLen; i++) {
574  switch (ColTypes[i]) {
575  case atInt:
576  T->IntCols[IntColIdx].Gen(Cnt);
577  IntColIdx++;
578  break;
579  case atFlt:
580  T->FltCols[FltColIdx].Gen(Cnt);
581  FltColIdx++;
582  break;
583  case atStr:
584  break;
585  }
586  }
587 
588  Cnt = 0;
589  omp_set_num_threads(NumThreads);
590  #pragma omp parallel for schedule(dynamic) reduction(+:Cnt)
591  for (int i = 0; i < NumThreads; i++) {
592  // calculate beginning of each line handled by thread
593  TVec<uint64> LineStartPosV = Ss.GetStartPosV(StartIntV[i], StartIntV[i+1]);
594 
595  // parse line and fill rows
596  for (uint64 k = 0; k < (uint64) LineStartPosV.Len(); k++) {
597  TVec<char*> FieldsV;
598  Ss.NextFromIndex(LineStartPosV[k], FieldsV);
599  if (FieldsV.Len() != S.Len()) {
600  TExcept::Throw("Error reading tsv file");
601  }
602  TInt IntColIdx = 0;
603  TInt FltColIdx = 0;
604  TInt RowIdx = PrefixSumV[i] + k;
605 
606  for (TInt j = 0; j < RowLen; j++) {
607  switch (ColTypes[j]) {
608  case atInt:
609  if (RelevantCols.Len() == 0) {
610  T->IntCols[IntColIdx][RowIdx] = \
611  (Ss.GetIntFromFldV(FieldsV, j));
612  } else {
613  T->IntCols[IntColIdx][RowIdx] = \
614  (Ss.GetIntFromFldV(FieldsV, RelevantCols[j]));
615  }
616  IntColIdx++;
617  break;
618  case atFlt:
619  if (RelevantCols.Len() == 0) {
620  T->FltCols[FltColIdx][RowIdx] = \
621  (Ss.GetFltFromFldV(FieldsV, j));
622  } else {
623  T->FltCols[FltColIdx][RowIdx] = \
624  (Ss.GetFltFromFldV(FieldsV, RelevantCols[j]));
625  }
626  FltColIdx++;
627  break;
628  case atStr:
629  TExcept::Throw("TTable::LoadSS:: Str Col found\n");
630  break;
631  }
632  }
633  Cnt++;
634  }
635  }
636 
637  // set number of rows and "Next" vector
638  T->NumRows = Cnt;
639  T->NumValidRows = T->NumRows;
640 
641  T->Next.Clr();
642  T->Next.Gen(Cnt);
643 
644  omp_set_num_threads(NumThreads);
645  #pragma omp parallel for schedule(dynamic, 10000)
646  for (int64 i = 0; i < Cnt-1; i++) {
647  T->Next[i] = i+1;
648  }
649  T->IsNextDirty = 0;
650  T->Next[Cnt-1] = Last;
651  T->LastValidRow = T->NumRows - 1;
652 
653  T->IdColName = "_id";
654  TInt IdCol = T->IntCols.Add();
655  T->IntCols[IdCol].Gen(Cnt);
656 
657  // initialize ID column
658  omp_set_num_threads(NumThreads);
659  #pragma omp parallel for schedule(dynamic, 10000)
660  for (int64 i = 0; i < Cnt; i++) {
661  T->IntCols[IdCol][i] = i;
662  }
663 
664  T->AddSchemaCol(T->IdColName, atInt);
665  T->AddColType(T->IdColName, atInt, T->IntCols.Len()-1);
666 }
667 #endif // GCC_ATOMIC
668 
670  PTable& T, const Schema& S, const TStr& InFNm, const TIntV& RelevantCols,
671  const char& Separator, TBool HasTitleLine) {
672  // preloaded necessary variables
673  int RowLen = T->Sch.Len();
674  TVec<TAttrType> ColTypes = TVec<TAttrType>(RowLen);
675  for (int i = 0; i < RowLen; i++) {
676  ColTypes[i] = T->GetSchemaColType(i);
677  }
678 
679  // Sequential load
680  TSsParser Ss(InFNm, Separator);
681  // if title line (i.e. names of the columns) is included as first row in the
682  // input file - use it to validate schema
683  if (HasTitleLine) {
684  Ss.Next();
685  if (S.Len() != Ss.GetFlds()) {
686  printf("%s\n", Ss[0]); TExcept::Throw("Table Schema Mismatch!");
687  }
688  for (int i = 0; i < Ss.GetFlds(); i++) {
689  // remove carriage return char
690  int L = strlen(Ss[i]);
691  if (Ss[i][L-1] < ' ') { Ss[i][L-1] = 0; }
692  if (NormalizeColName(S[i].Val1) != NormalizeColName(Ss[i])) { TExcept::Throw("Table Schema Mismatch!"); }
693  }
694  }
695 
696  // populate table columns
697  //printf("starting to populate table\n");
698  uint64 Cnt = 0;
699  while (Ss.Next()) {
700  int IntColIdx = 0;
701  int FltColIdx = 0;
702  int StrColIdx = 0;
703  Assert(Ss.GetFlds() == S.Len()); // compiled only in debug
704  if (Ss.GetFlds() != S.Len()) {
705  printf("%s\n", Ss[S.Len()]); TExcept::Throw("Error reading tsv file");
706  }
707  for (int i = 0; i < RowLen; i++) {
708  switch (ColTypes[i]) {
709  case atInt:
710  if (RelevantCols.Len() == 0) {
711  T->IntCols[IntColIdx].Add(Ss.GetInt(i));
712  } else {
713  T->IntCols[IntColIdx].Add(Ss.GetInt(RelevantCols[i]));
714  }
715  IntColIdx++;
716  break;
717  case atFlt:
718  if (RelevantCols.Len() == 0) {
719  T->FltCols[FltColIdx].Add(Ss.GetFlt(i));
720  } else {
721  T->FltCols[FltColIdx].Add(Ss.GetFlt(RelevantCols[i]));
722  }
723  FltColIdx++;
724  break;
725  case atStr:
726  int ColIdx;
727  if (RelevantCols.Len() == 0) {
728  ColIdx = i;
729  } else {
730  ColIdx = RelevantCols[i];
731  }
732  TStr Sval = TStr(Ss[ColIdx]);
733  T->AddStrVal(StrColIdx, Sval);
734  StrColIdx++;
735  break;
736  }
737  }
738  Cnt += 1;
739  }
740  //printf("finished populating table\n");
741  // set number of rows and "Next" vector
742  T->NumRows = static_cast<int>(Cnt);
743  T->NumValidRows = T->NumRows;
744 
745  T->Next.Clr();
746  T->Next.Gen(static_cast<int>(Cnt));
747  for (uint64 i = 0; i < Cnt-1; i++) {
748  T->Next[static_cast<int>(i)] = static_cast<int>(i+1);
749  }
750  T->IsNextDirty = 0;
751  T->Next[static_cast<int>(Cnt-1)] = Last;
752  T->LastValidRow = T->NumRows - 1;
753 
754  T->InitIds();
755 }
756 
757 PTable TTable::LoadSS(const Schema& S, const TStr& InFNm, TTableContext* Context,
758  const TIntV& RelevantCols, const char& Separator, TBool HasTitleLine) {
759  TVec<uint64> IntGroupByCols;
760  bool NoStringCols = true;
761 
762  // find the schema for the new table which contains only relevant columns
763  Schema SR;
764  if (RelevantCols.Len() == 0) {
765  SR = S;
766  } else {
767  for (int i = 0; i < RelevantCols.Len(); i++) {
768  SR.Add(S[RelevantCols[i]]);
769  }
770  }
771  PTable T = New(SR, Context);
772 
773  // find col types and check for string cols
774  for (int i = 0; i < SR.Len(); i++) {
775  if (T->GetSchemaColType(i) == atStr) {
776  NoStringCols = false;
777  break;
778  }
779  }
780 
781  if (GetMP() && NoStringCols) {
782  // Right now, can load in parallel only in Linux (for mmap) and if
783  // there are no string columns
784 #ifdef GLib_LINUX
785  LoadSSPar(T, S, InFNm, RelevantCols, Separator, HasTitleLine);
786 #else
787  LoadSSSeq(T, S, InFNm, RelevantCols, Separator, HasTitleLine);
788 #endif
789  } else {
790  LoadSSSeq(T, S, InFNm, RelevantCols, Separator, HasTitleLine);
791  }
792  return T;
793 }
794 
795 PTable TTable::LoadSS(const Schema& S, const TStr& InFNm, TTableContext* Context,
796  const char& Separator, TBool HasTitleLine) {
797  return LoadSS(S, InFNm, Context, TIntV(), Separator, HasTitleLine);
798 }
799 
800 void TTable::SaveSS(const TStr& OutFNm) {
801  if (NumValidRows == 0) {
802  printf("Table is empty");
803  return;
804  }
805  FILE* F = fopen(OutFNm.CStr(), "w");
806  // debug
807  if (F == NULL) {
808  printf("failed to open file %s\n", OutFNm.CStr());
809  perror("fail ");
810  return;
811  }
812 
813  Dump(F);
814 
815 #if 0
816  Schema DSch = DenormalizeSchema();
817 
818  TInt L = Sch.Len();
819  // print title (schema)
820  fprintf(F, "# ");
821  for (TInt i = 0; i < L-1; i++) {
822  fprintf(F, "%s\t", DSch[i].Val1.CStr());
823  }
824  fprintf(F, "%s\n", DSch[L-1].Val1.CStr());
825  // print table contents
826  for (TRowIterator RowI = BegRI(); RowI < EndRI(); RowI++) {
827  for (TInt i = 0; i < L; i++) {
828  char C = (i == L-1) ? '\n' : '\t';
829  switch (GetSchemaColType(i)) {
830  case atInt: {
831  fprintf(F, "%d%c", RowI.GetIntAttr(GetSchemaColName(i)).Val, C);
832  break;
833  }
834  case atFlt: {
835  fprintf(F, "%f%c", RowI.GetFltAttr(GetSchemaColName(i)).Val, C);
836  break;
837  }
838  case atStr: {
839  fprintf(F, "%s%c", RowI.GetStrAttr(GetSchemaColName(i)).CStr(), C);
840  break;
841  }
842  }
843  }
844  }
845 #endif
846  fclose(F);
847 }
848 
849 void TTable::SaveBin(const TStr& OutFNm) {
850  TFOut SOut(OutFNm);
851  Save(SOut);
852 }
853 
854 void TTable::Save(TSOut& SOut) {
855  NumRows.Save(SOut);
856  NumValidRows.Save(SOut);
857  FirstValidRow.Save(SOut);
858  LastValidRow.Save(SOut);
859  Next.Save(SOut);
860  IntCols.Save(SOut);
861  FltCols.Save(SOut);
862  StrColMaps.Save(SOut);
863 
864  THash<TStr,TPair<TInt,TInt> > ColTypeIntMap;
865  TInt atIntVal = TInt(0);
866  TInt atFltVal = TInt(1);
867  TInt atStrVal = TInt(2);
868  for (THash<TStr,TPair<TAttrType,TInt> >::TIter it = ColTypeMap.BegI(); it < ColTypeMap.EndI(); it++) {
869  TPair<TAttrType,TInt> dat = it.GetDat();
870  TStr DColName = DenormalizeColName(it.GetKey());
871  switch (dat.GetVal1()) {
872  case atInt:
873  ColTypeIntMap.AddDat(DColName, TPair<TInt,TInt>(atIntVal, dat.GetVal2()));
874  break;
875  case atFlt:
876  ColTypeIntMap.AddDat(DColName, TPair<TInt,TInt>(atFltVal, dat.GetVal2()));
877  break;
878  case atStr:
879  ColTypeIntMap.AddDat(DColName, TPair<TInt,TInt>(atStrVal, dat.GetVal2()));
880  break;
881  }
882  }
883  ColTypeIntMap.Save(SOut);
884  SOut.Flush();
885 }
886 
887 void TTable::Dump(FILE *OutF) const {
888  TInt L = Sch.Len();
889  Schema DSch = DenormalizeSchema();
890 
891  // LoadSS() will not throw away lines with #
892  //fprintf(OutF, "# Table: rows: %d, columns: %d\n", GetNumValidRows(), GetNodes());
893  // print title (schema), LoadSS() will take first line as (optional) schema
894  fprintf(OutF, "# ");
895  for (TInt i = 0; i < L-1; i++) {
896  fprintf(OutF, "%s\t", DSch[i].Val1.CStr());
897  }
898  fprintf(OutF, "%s\n", DSch[L-1].Val1.CStr());
899  // print table contents
900  for (TRowIterator RowI = BegRI(); RowI < EndRI(); RowI++) {
901  for (TInt i = 0; i < L; i++) {
902  char C = (i == L-1) ? '\n' : '\t';
903  switch (GetSchemaColType(i)) {
904  case atInt: {
905  fprintf(OutF, "%d%c", RowI.GetIntAttr(GetSchemaColName(i)).Val, C);
906  break;
907  }
908  case atFlt: {
909  fprintf(OutF, "%f%c", RowI.GetFltAttr(GetSchemaColName(i)).Val, C);
910  break;
911  }
912  case atStr: {
913  fprintf(OutF, "%s%c", RowI.GetStrAttr(GetSchemaColName(i)).CStr(), C);
914  break;
915  }
916  }
917  }
918  }
919 }
920 
922  TInt L = Sch.Len();
923 
924 #if 0
925  // print table on the input, iterate over all columns
926  for (TInt i = 0; i < L; i++) {
927  // skip non-string columns
928  if (GetSchemaColType(i) != atStr) {
929  continue;
930  }
931 
932  TInt ColIdx = GetColIdx(GetSchemaColName(i));
933 
934  // iterate over all rows
935  for (TRowIterator RowI = BegRI(); RowI < EndRI(); RowI++) {
936  TInt RowIdx = RowI.GetRowIdx();
937  TInt KeyId = StrColMaps[ColIdx][RowIdx];
938  printf("ChangeContext in %d %d %d .%s.\n",
939  ColIdx.Val, RowIdx.Val, KeyId.Val, GetStrVal(ColIdx, RowIdx).CStr());
940  }
941  }
942 #endif
943 
944  // add strings to the new context, change values
945  // iterate over all columns
946  for (TInt i = 0; i < L; i++) {
947  // skip non-string columns
948  if (GetSchemaColType(i) != atStr) {
949  continue;
950  }
951 
952  TInt ColIdx = GetColIdx(GetSchemaColName(i));
953 
954  // iterate over all rows
955  for (TRowIterator RowI = BegRI(); RowI < EndRI(); RowI++) {
956  TInt RowIdx = RowI.GetRowIdx();
957  // get the string
958  TStr Key = GetStrValIdx(ColIdx, RowIdx);
959  // add the string to the new context
960  TInt KeyId = TInt(NewContext->StringVals.AddKey(Key));
961  // change the value in the table
962  StrColMaps[ColIdx][RowIdx] = KeyId;
963  }
964  }
965 
966  // set the new context
967  Context = NewContext;
968  return Context;
969 }
970 
971 void TTable::AddStrVal(const TInt& ColIdx, const TStr& Key) {
972  TInt KeyId = TInt(Context->StringVals.AddKey(Key));
973  //printf("TTable::AddStrVal2 %d .%s. %d\n", ColIdx.Val, Key.CStr(), KeyId.Val);
974  StrColMaps[ColIdx].Add(KeyId);
975 }
976 
977 void TTable::AddStrVal(const TStr& Col, const TStr& Key) {
978  if (GetColType(Col) != atStr) {
979  TExcept::Throw(Col + " is not a string valued column");
980  }
981  //printf("TTable::AddStrVal1 .%s. .%s.\n", Col.CStr(), Key.CStr());
982  AddStrVal(GetColIdx(Col), Key);
983 }
984 
985 void TTable::AddGraphAttribute(const TStr& Attr, TBool IsEdge, TBool IsSrc, TBool IsDst) {
986  if (!IsColName(Attr)) { TExcept::Throw(Attr + ": No such column"); }
987  if (IsEdge) { EdgeAttrV.Add(NormalizeColName(Attr)); }
988  if (IsSrc) { SrcNodeAttrV.Add(NormalizeColName(Attr)); }
989  if (IsDst) { DstNodeAttrV.Add(NormalizeColName(Attr)); }
990 }
991 
992 void TTable::AddGraphAttributeV(TStrV& Attrs, TBool IsEdge, TBool IsSrc, TBool IsDst) {
993  for (TInt i = 0; i < Attrs.Len(); i++) {
994  if (!IsColName(Attrs[i])) {
995  TExcept::Throw(Attrs[i] + ": no such column");
996  }
997  }
998  for (TInt i = 0; i < Attrs.Len(); i++) {
999  if (IsEdge) { EdgeAttrV.Add(NormalizeColName(Attrs[i])); }
1000  if (IsSrc) { SrcNodeAttrV.Add(NormalizeColName(Attrs[i])); }
1001  if (IsDst) { DstNodeAttrV.Add(NormalizeColName(Attrs[i])); }
1002  }
1003 }
1004 
1006  TStrV IntNA = TStrV(IntCols.Len(),0);
1007  for (TInt i = 0; i < SrcNodeAttrV.Len(); i++) {
1008  TStr Attr = SrcNodeAttrV[i];
1009  if (GetColType(Attr) == atInt) {
1010  IntNA.Add(Attr);
1011  }
1012  }
1013  return IntNA;
1014 }
1015 
1017  TStrV IntNA = TStrV(IntCols.Len(),0);
1018  for (TInt i = 0; i < DstNodeAttrV.Len(); i++) {
1019  TStr Attr = DstNodeAttrV[i];
1020  if (GetColType(Attr) == atInt) {
1021  IntNA.Add(Attr);
1022  }
1023  }
1024  return IntNA;
1025 }
1026 
1028  TStrV IntEA = TStrV(IntCols.Len(),0);
1029  for (TInt i = 0; i < EdgeAttrV.Len(); i++) {
1030  TStr Attr = EdgeAttrV[i];
1031  if (GetColType(Attr) == atInt) {
1032  IntEA.Add(Attr);
1033  }
1034  }
1035  return IntEA;
1036 }
1037 
1039  TStrV FltNA = TStrV(FltCols.Len(),0);
1040  for (TInt i = 0; i < SrcNodeAttrV.Len(); i++) {
1041  TStr Attr = SrcNodeAttrV[i];
1042  if (GetColType(Attr) == atFlt) {
1043  FltNA.Add(Attr);
1044  }
1045  }
1046  return FltNA;
1047 }
1048 
1050  TStrV FltNA = TStrV(FltCols.Len(),0);
1051  for (TInt i = 0; i < DstNodeAttrV.Len(); i++) {
1052  TStr Attr = DstNodeAttrV[i];
1053  if (GetColType(Attr) == atFlt) {
1054  FltNA.Add(Attr);
1055  }
1056  }
1057  return FltNA;
1058 }
1059 
1061  TStrV FltEA = TStrV(FltCols.Len(),0);;
1062  for (TInt i = 0; i < EdgeAttrV.Len(); i++) {
1063  TStr Attr = EdgeAttrV[i];
1064  if (GetColType(Attr) == atFlt) {
1065  FltEA.Add(Attr);
1066  }
1067  }
1068  return FltEA;
1069 }
1070 
1072  TStrV StrNA = TStrV(StrColMaps.Len(),0);
1073  for (TInt i = 0; i < SrcNodeAttrV.Len(); i++) {
1074  TStr Attr = SrcNodeAttrV[i];
1075  if (GetColType(Attr) == atStr) {
1076  StrNA.Add(Attr);
1077  }
1078  }
1079  return StrNA;
1080 }
1081 
1083  TStrV StrNA = TStrV(StrColMaps.Len(),0);
1084  for (TInt i = 0; i < DstNodeAttrV.Len(); i++) {
1085  TStr Attr = DstNodeAttrV[i];
1086  if (GetColType(Attr) == atStr) {
1087  StrNA.Add(Attr);
1088  }
1089  }
1090  return StrNA;
1091 }
1092 
1093 
1095  TStrV StrEA = TStrV(StrColMaps.Len(),0);
1096  for (TInt i = 0; i < EdgeAttrV.Len(); i++) {
1097  TStr Attr = EdgeAttrV[i];
1098  if (GetColType(Attr) == atStr) {
1099  StrEA.Add(Attr);
1100  }
1101  }
1102  return StrEA;
1103 }
1104 
1105 void TTable::Rename(const TStr& column, const TStr& NewLabel) {
1106  // This function is necessary, for example to take the union of two tables
1107  // where the attribute names don't match.
1108  if (!IsColName(column)) { TExcept::Throw("no such column " + column); }
1109  TPair<TAttrType,TInt> ColVal = GetColTypeMap(column);
1110  DelColType(column);
1111  AddColType(NewLabel, ColVal);
1112  TStr NColName = NormalizeColName(column);
1113  TStr NLabel = NormalizeColName(NewLabel);
1114  for (TInt c = 0; c < Sch.Len(); c++) {
1115  if (Sch[c].Val1 == NColName) {
1116  Sch.SetVal(c, TPair<TStr, TAttrType>(NLabel, Sch[c].Val2));
1117  break;
1118  }
1119  }
1120 }
1121 
1123  if (FirstValidRow == LastValidRow) {
1124  LastValidRow = -1;
1125  }
1126 
1127  TInt Old = FirstValidRow;
1129  Next[Old] = TTable::Invalid;
1130  NumValidRows--;
1131  TInt IdColIdx = GetColIdx(GetIdColName());
1132  RowIdMap.AddDat(IntCols[IdColIdx][Old], Invalid);
1133 }
1134 
1135 void TTable::RemoveRow(TInt RowIdx, TInt PrevRowIdx) {
1136  if (RowIdx == FirstValidRow) {
1137  RemoveFirstRow();
1138  return;
1139  }
1140  Assert(RowIdx != TTable::Invalid);
1141  if (RowIdx == TTable::Last) { return; }
1142  Next[PrevRowIdx] = Next[RowIdx];
1143  if (LastValidRow == RowIdx) {
1144  LastValidRow = RowIdx;
1145  }
1146  Next[RowIdx] = TTable::Invalid;
1147  NumValidRows--;
1148  TInt IdColIdx = GetColIdx(GetIdColName());
1149  RowIdMap.AddDat(IntCols[IdColIdx][RowIdx], Invalid);
1150 }
1151 
1152 void TTable::KeepSortedRows(const TIntV& KeepV) {
1153  TIntIntH KeepH(KeepV.Len());
1154  for (TInt i = 0; i < KeepV.Len(); i++) {
1155  KeepH.AddKey(KeepV[i]);
1156  }
1157 
1159  TInt KeepSize = 0;
1160  while (RowI.GetNextRowIdx() != Last) {
1161  if (KeepSize < KeepV.Len()) {
1162  if (KeepH.IsKey(RowI.GetNextRowIdx())) {
1163  KeepSize++;
1164  RowI++;
1165  } else {
1166  RowI.RemoveNext();
1167  }
1168  } else {
1169  // Covered all of KeepV. Remove the rest of the rows.
1170  // Current RowI.CurrRowIdx is the last element of KeepV.
1171  RowI.RemoveNext();
1172  }
1173  }
1174  LastValidRow = KeepV[KeepV.Len()-1];
1175 }
1176 
1177 void TTable::GetPartitionRanges(TIntPrV& Partitions, TInt NumPartitions) const {
1178  TInt PartitionSize = NumValidRows / (NumPartitions);
1179  if (NumValidRows % NumPartitions != 0) PartitionSize++;
1180  if (PartitionSize < 10) {
1181  PartitionSize = 10;
1182  NumPartitions = NumValidRows / PartitionSize;
1183  }
1184  Partitions.Reserve(NumPartitions+1);
1185 
1186  TInt currRow = FirstValidRow;
1187  TInt currStart = currRow;
1188  if (IsNextDirty) {
1189  TInt currCount = PartitionSize;
1190  while (currRow != TTable::Last) {
1191  if (currCount == 0) {
1192  Partitions.Add(TIntPr(currStart, currRow));
1193  currStart = currRow;
1194  currCount = PartitionSize;
1195  }
1196  currRow = Next[currRow];
1197  currCount--;
1198  }
1199  Partitions.Add(TIntPr(currStart, currRow));
1200  } else {
1201  // Optimize for the case when rows are logically in sequence.
1202  currRow += PartitionSize;
1203  while (currRow != TTable::Last && currRow < Next.Len()) {
1204  if (Next[currRow] == TTable::Invalid) { currRow++; continue; }
1205  Partitions.Add(TIntPr(currStart, currRow));
1206  currStart = currRow;
1207  currRow += PartitionSize;
1208  }
1209  Partitions.Add(TIntPr(currStart, TTable::Last));
1210  }
1211  //printf("Num partitions: %d\n", Partitions.Len());
1212 }
1213 
1214 /***** Grouping Utility functions ****/
1215 void TTable::GroupingSanityCheck(const TStr& GroupBy, const TAttrType& AttrType) const {
1216  if (!IsColName(GroupBy)) {
1217  TExcept::Throw("no such column " + GroupBy);
1218  }
1219  if (GetColType(GroupBy) != AttrType) {
1220  TExcept::Throw(GroupBy + " values are not of expected type");
1221  }
1222 }
1223 
1224 #ifdef GCC_ATOMIC
1225 void TTable::GroupByIntColMP(const TStr& GroupBy, THashMP<TInt, TIntV>& Grouping, TBool UsePhysicalIds) const {
1226  timeval timer0;
1227  gettimeofday(&timer0, NULL);
1228  //double t1 = timer0.tv_sec + (timer0.tv_usec/1000000.0);
1229  //printf("X\n");
1230  TInt IdColIdx = GetColIdx(IdColName);
1231  TInt GroupByColIdx = GetColIdx(GroupBy);
1232  if(!UsePhysicalIds && IdColIdx < 0){
1233  TExcept::Throw("Grouping: Either use physical row ids, or have an id column");
1234  }
1235  //double startFn = omp_get_wtime();
1236  GroupingSanityCheck(GroupBy, atInt);
1237  TIntPrV Partitions;
1238  GetPartitionRanges(Partitions, 8*CHUNKS_PER_THREAD);
1239  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
1240  //double endPart = omp_get_wtime();
1241  //printf("Partition time = %f\n", endPart-startFn);
1242 
1243  Grouping.Gen(NumValidRows);
1244  //double endGen = omp_get_wtime();
1245  //printf("Gen time = %f\n", endGen-endPart);
1246  //printf("S\n");
1247  #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD) //num_threads(1)
1248  for (int i = 0; i < Partitions.Len(); i++){
1249  TRowIterator RowI(Partitions[i].GetVal1(), this);
1250  TRowIterator EndI(Partitions[i].GetVal2(), this);
1251  while (RowI < EndI) {
1252  TInt idx = UsePhysicalIds ? RowI.GetRowIdx() : RowI.GetIntAttr(IdColIdx);
1253  // printf("updating grouping with key = %d, row_id = %d\n", RowI.GetIntAttr(GroupBy).Val, idx.Val);
1254  UpdateGrouping<TInt>(Grouping, RowI.GetIntAttr(GroupByColIdx), idx);
1255  RowI++;
1256  }
1257  }
1258  gettimeofday(&timer0, NULL);
1259  //double t2 = timer0.tv_sec + (timer0.tv_usec/1000000.0);
1260  //printf("Grouping time: %f\n", t2 - t1);
1261  //double endAdd = omp_get_wtime();
1262  //printf("Add time = %f\n", endAdd-endGen);
1263 }
1264 #endif // GCC_ATOMIC
1265 
1266 void TTable::Unique(const TStr& Col) {
1267  TIntV RemainingRows;
1268  TStr NCol = NormalizeColName(Col);
1269  switch (GetColType(NCol)) {
1270  case atInt: {
1271  TIntIntVH Grouping;
1272  GroupByIntCol(NCol, Grouping, TIntV(), true, true);
1273  for (TIntIntVH::TIter it = Grouping.BegI(); it < Grouping.EndI(); it++) {
1274  RemainingRows.Add(it->Dat[0]);
1275  }
1276  break;
1277  }
1278  case atFlt: {
1279  THash<TFlt,TIntV> Grouping;
1280  GroupByFltCol(NCol, Grouping, TIntV(), true, true);
1281  for (THash<TFlt,TIntV>::TIter it = Grouping.BegI(); it < Grouping.EndI(); it++) {
1282  RemainingRows.Add(it->Dat[0]);
1283  }
1284  break;
1285  }
1286  case atStr: {
1287  TIntIntVH Grouping;
1288  GroupByStrCol(NCol, Grouping, TIntV(), true, true);
1289  for (TIntIntVH::TIter it = Grouping.BegI(); it < Grouping.EndI(); it++) {
1290  RemainingRows.Add(it->Dat[0]);
1291  }
1292  break;
1293  }
1294  }
1295  KeepSortedRows(RemainingRows);
1296 }
1297 
1298 void TTable::Unique(const TStrV& Cols, TBool Ordered) {
1299  if(Cols.Len() == 1){
1300  Unique(Cols[0]);
1301  return;
1302  }
1303  TStrV NCols = NormalizeColNameV(Cols);
1305  TIntV UniqueVec;
1306  GroupAux(NCols, Grouping, Ordered, "", true, UniqueVec, true);
1307  KeepSortedRows(UniqueVec);
1308 }
1309 
1310 void TTable::StoreGroupCol(const TStr& GroupColName, const TVec<TPair<TInt, TInt> >& GroupAndRowIds) {
1311  // Add a column where the value of the i'th row is the group id of row i.
1313  TInt L = IntCols.Len();
1314  AddColType(GroupColName, atInt, L-1);
1315  // Store group id for each row.
1316  for (TInt i = 0; i < GroupAndRowIds.Len(); i++) {
1317  IntCols[L-1][GroupAndRowIds[i].Val2] = GroupAndRowIds[i].Val1;
1318  }
1319 }
1320 
1321 // Core crouping logic.
1322 void TTable::GroupAux(const TStrV& GroupBy, THash<TGroupKey, TPair<TInt, TIntV> >& Grouping,
1323  TBool Ordered, const TStr& GroupColName, TBool KeepUnique, TIntV& UniqueVec, TBool UsePhysicalIds) {
1324  TInt IdColIdx = GetColIdx(IdColName);
1325  if(!UsePhysicalIds && IdColIdx < 0){
1326  TExcept::Throw("Grouping: Either use physical row ids, or have an id column");
1327  }
1328  TIntV IntGroupByCols;
1329  TIntV FltGroupByCols;
1330  TIntV StrGroupByCols;
1331  // get indices for each column type
1332  for (TInt c = 0; c < GroupBy.Len(); c++) {
1333  //printf("GroupBy col %d: %s\n", c.Val, GroupBy[c].CStr());
1334  if (!IsColName(GroupBy[c])) {
1335  TExcept::Throw("no such column " + GroupBy[c]);
1336  }
1337 
1338  TPair<TAttrType, TInt> ColType = GetColTypeMap(GroupBy[c]);
1339  switch (ColType.Val1) {
1340  case atInt:
1341  IntGroupByCols.Add(ColType.Val2);
1342  break;
1343  case atFlt:
1344  FltGroupByCols.Add(ColType.Val2);
1345  break;
1346  case atStr:
1347  StrGroupByCols.Add(ColType.Val2);
1348  break;
1349  }
1350  }
1351 
1352  TInt IKLen = IntGroupByCols.Len();
1353  TInt FKLen = FltGroupByCols.Len();
1354  TInt SKLen = StrGroupByCols.Len();
1355 
1356  TInt GroupNum = 0;
1357  TVec<TPair<TInt, TInt> > GroupAndRowIds;
1358  //printf("done GroupAux initialization\n");
1359 
1360  // iterate over rows
1361  for (TRowIterator it = BegRI(); it < EndRI(); it++) {
1362  TIntV IKey(IKLen + SKLen, 0);
1363  TFltV FKey(FKLen, 0);
1364  TIntV SKey(SKLen, 0);
1365 
1366  // find group key
1367  for (TInt c = 0; c < IKLen; c++) {
1368  IKey.Add(it.GetIntAttr(IntGroupByCols[c]));
1369  }
1370  for (TInt c = 0; c < FKLen; c++) {
1371  FKey.Add(it.GetFltAttr(FltGroupByCols[c]));
1372  }
1373  for (TInt c = 0; c < SKLen; c++) {
1374  SKey.Add(it.GetStrMapById(StrGroupByCols[c]));
1375  }
1376  if (!Ordered) {
1377  if (IKLen > 0) { IKey.ISort(0, IKey.Len()-1, true); }
1378  if (FKLen > 0) { FKey.ISort(0, FKey.Len()-1, true); }
1379  if (SKLen > 0) { SKey.ISort(0, SKey.Len()-1, true); }
1380  }
1381  for (TInt c = 0; c < SKLen; c++) {
1382  IKey.Add(SKey[c]);
1383  }
1384 
1385  // look for group matching the key
1386  TGroupKey GroupKey = TGroupKey(IKey, FKey);
1387 
1388  TInt RowIdx = it.GetRowIdx();
1389  TInt idx = UsePhysicalIds ? it.GetRowIdx() : IntCols[IdColIdx][it.GetRowIdx()];
1390  if (!Grouping.IsKey(GroupKey)) {
1391  // Grouping key hasn't been seen before, create a new group
1392  TPair<TInt, TIntV> NewGroup;
1393  NewGroup.Val1 = GroupNum;
1394  NewGroup.Val2.Add(idx);
1395  Grouping.AddDat(GroupKey, NewGroup);
1396  if (GroupColName != "") {
1397  GroupAndRowIds.Add(TPair<TInt, TInt>(GroupNum, RowIdx));
1398  }
1399  if (KeepUnique) {
1400  UniqueVec.Add(idx);
1401  }
1402  GroupNum++;
1403  } else {
1404  // Grouping key has been seen before, update corresponding group
1405  if (!KeepUnique) {
1406  TPair<TInt, TIntV>& NewGroup = Grouping.GetDat(GroupKey);
1407  NewGroup.Val2.Add(idx);
1408  if (GroupColName != "") {
1409  GroupAndRowIds.Add(TPair<TInt, TInt>(NewGroup.Val1, RowIdx));
1410  }
1411  }
1412  }
1413  }
1414  // printf("KeepUnique: %d\n", KeepUnique.Val);
1415  // update group mapping
1416  if (!KeepUnique) {
1417  GroupStmt Stmt(NormalizeColNameV(GroupBy), Ordered, UsePhysicalIds);
1418  GroupStmtNames.AddDat(GroupColName, Stmt);
1419  GroupIDMapping.AddKey(Stmt);
1420  GroupMapping.AddKey(Stmt);
1421  //printf("Adding statement: ");
1422  //Stmt.Print();
1423  for (THash<TGroupKey, TPair<TInt, TIntV> >::TIter it = Grouping.BegI(); it < Grouping.EndI(); it++) {
1424  TGroupKey key = it.GetKey();
1425  TPair<TInt, TIntV> group = it.GetDat();
1426  GroupIDMapping.GetDat(Stmt).AddDat(group.Val1, TGroupKey(key));
1427  GroupMapping.GetDat(Stmt).AddDat(TGroupKey(key), TIntV(group.Val2));
1428  }
1429  }
1430 
1431  // add a column to the table
1432  if (GroupColName != "") {
1433  StoreGroupCol(GroupColName, GroupAndRowIds);
1434  AddSchemaCol(GroupColName, atInt); // update schema
1435  }
1436 }
1437 
1438 /*
1439 // Core grouping logic.
1440 #ifdef USE_OPENMP
1441 void TTable::GroupAuxMP(const TStrV& GroupBy, THashGenericMP<TGroupKey, TPair<TInt, TIntV> >& Grouping,
1442  TBool Ordered, const TStr& GroupColName, TBool KeepUnique, TIntV& UniqueVec, TBool UsePhysicalIds) {
1443  //double startFn = omp_get_wtime();
1444  TIntV IntGroupByCols;
1445  TIntV FltGroupByCols;
1446  TIntV StrGroupByCols;
1447  // get indices for each column type
1448  for (TInt c = 0; c < GroupBy.Len(); c++) {
1449  if (!IsColName(GroupBy[c])) {
1450  TExcept::Throw("no such column " + GroupBy[c]);
1451  }
1452 
1453  TPair<TAttrType, TInt> ColType = GetColTypeMap(GroupBy[c]);
1454  switch (ColType.Val1) {
1455  case atInt:
1456  IntGroupByCols.Add(ColType.Val2);
1457  break;
1458  case atFlt:
1459  FltGroupByCols.Add(ColType.Val2);
1460  break;
1461  case atStr:
1462  StrGroupByCols.Add(ColType.Val2);
1463  break;
1464  }
1465  }
1466 
1467  TInt IKLen = IntGroupByCols.Len();
1468  TInt FKLen = FltGroupByCols.Len();
1469  TInt SKLen = StrGroupByCols.Len();
1470 
1471  TInt GroupNum = 0;
1472  TInt IdColIdx = GetColIdx(IdColName);
1473 
1474  //double endInit = omp_get_wtime();
1475  //printf("Init time = %f\n", endInit-startFn);
1476 
1477  TVec<TPair<TInt, TInt> > GroupAndRowIds;
1478 
1479  // iterate over rows
1480  for (TRowIterator it = BegRI(); it < EndRI(); it++) {
1481  TIntV IKey(IKLen + SKLen, 0);
1482  TFltV FKey(FKLen, 0);
1483  TIntV SKey(SKLen, 0);
1484 
1485  // find group key
1486  for (TInt c = 0; c < IKLen; c++) {
1487  IKey.Add(it.GetIntAttr(IntGroupByCols[c]));
1488  }
1489  for (TInt c = 0; c < FKLen; c++) {
1490  FKey.Add(it.GetFltAttr(FltGroupByCols[c]));
1491  }
1492  for (TInt c = 0; c < SKLen; c++) {
1493  SKey.Add(it.GetStrMapById(StrGroupByCols[c]));
1494  }
1495  if (!Ordered) {
1496  if (IKLen > 0) { IKey.ISort(0, IKey.Len()-1, true); }
1497  if (FKLen > 0) { FKey.ISort(0, FKey.Len()-1, true); }
1498  if (SKLen > 0) { SKey.ISort(0, SKey.Len()-1, true); }
1499  }
1500  for (TInt c = 0; c < SKLen; c++) {
1501  IKey.Add(SKey[c]);
1502  }
1503 
1504  // look for group matching the key
1505  TGroupKey GroupKey = TGroupKey(IKey, FKey);
1506 
1507  TInt RowIdx = it.GetRowIdx();
1508  if (!Grouping.IsKey(GroupKey)) {
1509  // Grouping key hasn't been seen before, create a new group
1510  TPair<TInt, TIntV> NewGroup;
1511  NewGroup.Val1 = GroupNum;
1512  if(IdColIdx > 0){
1513  NewGroup.Val2.Add(IntCols[IdColIdx][RowIdx]);
1514  }
1515  Grouping.AddDat(GroupKey, NewGroup);
1516  if (GroupColName != "") {
1517  GroupAndRowIds.Add(TPair<TInt, TInt>(GroupNum, RowIdx));
1518  }
1519  if (KeepUnique) {
1520  UniqueVec.Add(RowIdx);
1521  }
1522  GroupNum++;
1523  } else {
1524  // Grouping key has been seen before, update corresponding group
1525  if (!KeepUnique) {
1526  TPair<TInt, TIntV>& NewGroup = Grouping.GetDat(GroupKey);
1527  if(IdColIdx > 0){
1528  NewGroup.Val2.Add(IntCols[IdColIdx][RowIdx]);
1529  }
1530  if (GroupColName != "") {
1531  GroupAndRowIds.Add(TPair<TInt, TInt>(NewGroup.Val1, RowIdx));
1532  }
1533  }
1534  }
1535  }
1536 
1537  //double endIter = omp_get_wtime();
1538  //printf("Iter time = %f\n", endIter-endInit);
1539 
1540  // update group mapping
1541  if (!KeepUnique) {
1542  TPair<TStrV, TBool> GroupStmt(GroupBy, Ordered);
1543  GroupStmtNames.AddDat(GroupColName, GroupStmt);
1544  GroupIDMapping.AddDat(GroupStmt);
1545  GroupMapping.AddDat(GroupStmt);
1546  for (THash<TGroupKey, TPair<TInt, TIntV> >::TIter it = Grouping.BegI(); it < Grouping.EndI(); it++) {
1547  TGroupKey key = it.GetKey();
1548  TPair<TInt, TIntV> group = it.GetDat();
1549  GroupIDMapping.GetDat(GroupStmt).AddDat(group.Val1, key);
1550  GroupMapping.GetDat(GroupStmt).AddDat(key, group.Val2);
1551  }
1552  }
1553 
1554  //double endMapping = omp_get_wtime();
1555  //printf("Mapping time = %f\n", endMapping-endIter);
1556 
1557  // add a column to the table
1558  if (GroupColName != "") {
1559  StoreGroupCol(GroupColName, GroupAndRowIds);
1560  AddSchemaCol(GroupColName, atInt); // update schema
1561  }
1562 
1563  //double endStore = omp_get_wtime();
1564  //printf("Store time = %f\n", endStore-endMapping);
1565 }
1566 #endif // USE_OPENMP
1567 */
1568 
1569 void TTable::Group(const TStrV& GroupBy, const TStr& GroupColName, TBool Ordered, TBool UsePhysicalIds) {
1570  TStrV NGroupBy = NormalizeColNameV(GroupBy);
1571  TStr NGroupColName = NormalizeColName(GroupColName);
1572  TIntV UniqueVec;
1574  GroupAux(NGroupBy, Grouping, Ordered, NGroupColName, false, UniqueVec, UsePhysicalIds);
1575 }
1576 
1578  //TODO
1579 }
1580 
1582  //TODO
1583 }
1584 
1585 void TTable::Aggregate(const TStrV& GroupByAttrs, TAttrAggr AggOp,
1586  const TStr& ValAttr, const TStr& ResAttr, TBool Ordered) {
1587 
1588  for (TInt c = 0; c < GroupByAttrs.Len(); c++) {
1589  if (!IsColName(GroupByAttrs[c])) {
1590  TExcept::Throw("no such column " + GroupByAttrs[c]);
1591  }
1592  }
1593 
1594  // double startFn = omp_get_wtime();
1595  TStrV NGroupByAttrs = NormalizeColNameV(GroupByAttrs);
1596  TBool UsePhysicalIds = (GetColIdx(IdColName) < 0);
1597 
1598  THash<TInt,TIntV> GroupByIntMapping;
1599  THash<TFlt,TIntV> GroupByFltMapping;
1600  THash<TInt,TIntV> GroupByStrMapping;
1601  THash<TGroupKey,TIntV> Mapping;
1602 #ifdef GCC_ATOMIC
1603  THashMP<TInt,TIntV> GroupByIntMapping_MP(NumValidRows);
1604  TIntV GroupByIntMPKeys(NumValidRows);
1605 #endif
1606  TInt NumOfGroups = 0;
1607  TInt GroupingCase = 0;
1608 
1609  // check if grouping already exists
1610  GroupStmt Stmt(NGroupByAttrs, Ordered, UsePhysicalIds);
1611  if (GroupMapping.IsKey(Stmt)) {
1612  Mapping = GroupMapping.GetDat(Stmt);
1613  } else{
1614  if(NGroupByAttrs.Len() == 1){
1615  switch(GetColType(NGroupByAttrs[0])){
1616  case atInt:
1617 #ifdef GCC_ATOMIC
1618  if(GetMP()){
1619  GroupByIntColMP(NGroupByAttrs[0], GroupByIntMapping_MP, UsePhysicalIds);
1620  int x = 0;
1621  for(THashMP<TInt,TIntV>::TIter it = GroupByIntMapping_MP.BegI(); it < GroupByIntMapping_MP.EndI(); it++){
1622  GroupByIntMPKeys[x] = it.GetKey();
1623  x++;
1624  /*
1625  printf("%d --> ", it.GetKey().Val);
1626  TIntV& V = it.GetDat();
1627  for(int i = 0; i < V.Len(); i++){
1628  printf(" %d", V[i].Val);
1629  }
1630  printf("\n");
1631  */
1632  }
1633  NumOfGroups = x;
1634  GroupingCase = 4;
1635  //printf("Number of groups: %d\n", NumOfGroups.Val);
1636  break;
1637  }
1638 #endif // GCC_ATOMIC
1639  GroupByIntCol(NGroupByAttrs[0], GroupByIntMapping, TIntV(), true, UsePhysicalIds);
1640  NumOfGroups = GroupByIntMapping.Len();
1641  GroupingCase = 1;
1642  break;
1643  case atFlt:
1644  GroupByFltCol(NGroupByAttrs[0], GroupByFltMapping, TIntV(), true, UsePhysicalIds);
1645  NumOfGroups = GroupByFltMapping.Len();
1646  GroupingCase = 2;
1647  break;
1648  case atStr:
1649  GroupByStrCol(NGroupByAttrs[0], GroupByStrMapping, TIntV(), true, UsePhysicalIds);
1650  NumOfGroups = GroupByStrMapping.Len();
1651  GroupingCase = 3;
1652  break;
1653  }
1654  }
1655  else{
1656  TIntV UniqueVector;
1658  GroupAux(NGroupByAttrs, Mapping_aux, Ordered, "", false, UniqueVector, UsePhysicalIds);
1659  for(THash<TGroupKey, TPair<TInt, TIntV> >::TIter it = Mapping_aux.BegI(); it < Mapping_aux.EndI(); it++){
1660  Mapping.AddDat(it.GetKey(), it.GetDat().Val2);
1661  }
1662  NumOfGroups = Mapping.Len();
1663  }
1664  }
1665 
1666  // double endGroup = omp_get_wtime();
1667  // printf("Group time = %f\n", endGroup-startFn);
1668 
1669  TAttrType T = GetColType(ValAttr);
1670 
1671  // add column corresponding to result attribute type
1672  if (AggOp == aaCount) { AddIntCol(ResAttr); }
1673  else {
1674  if (T == atInt) { AddIntCol(ResAttr); }
1675  else if (T == atFlt) { AddFltCol(ResAttr); }
1676  else {
1677  // Count is the only aggregation operation handled for Str
1678  TExcept::Throw("Invalid aggregation for Str type!");
1679  }
1680  }
1681  TInt ColIdx = GetColIdx(ResAttr);
1682  TInt AggrColIdx = GetColIdx(ValAttr);
1683 
1684  // double endAdd = omp_get_wtime();
1685  // printf("AddCol time = %f\n", endAdd-endGroup);
1686 
1687 #ifdef USE_OPENMP
1688  #pragma omp parallel for schedule(dynamic)
1689 #endif
1690  for (int g = 0; g < NumOfGroups; g++) {
1691  TIntV* GroupRows = NULL;
1692  switch(GroupingCase){
1693  case 0:
1694  GroupRows = & Mapping.GetDat(Mapping.GetKey(g));
1695  break;
1696  case 1:
1697  GroupRows = & GroupByIntMapping.GetDat(GroupByIntMapping.GetKey(g));
1698  break;
1699  case 2:
1700  GroupRows = & GroupByIntMapping.GetDat(GroupByIntMapping.GetKey(g));
1701  break;
1702  case 3:
1703  GroupRows = & GroupByStrMapping.GetDat(GroupByStrMapping.GetKey(g));
1704  break;
1705  case 4:
1706 #ifdef GCC_ATOMIC
1707  GroupRows = & GroupByIntMapping_MP.GetDat(GroupByIntMPKeys[g]);
1708 #endif
1709  break;
1710  }
1711 
1712  // find valid rows of group
1713  /*
1714  TIntV ValidRows;
1715  for (TInt i = 0; i < GroupRows.Len(); i++) {
1716  // TODO: This should not be necessary
1717  if (!RowIdMap.IsKey(GroupRows[i])) { continue; }
1718  TInt RowId = RowIdMap.GetDat(GroupRows[i]);
1719  // GroupRows has physical row indices
1720  if (RowId != Invalid) { ValidRows.Add(RowId); }
1721  }
1722  */
1723  TIntV& ValidRows = *GroupRows;
1724  TInt sz = ValidRows.Len();
1725  if (sz <= 0) continue;
1726  // Count is handled separately (other operations have aggregation policies defined in a template)
1727  if (AggOp == aaCount) {
1728  for (TInt i = 0; i < sz; i++) { IntCols[ColIdx][ValidRows[i]] = sz; }
1729  } else {
1730  // aggregate based on column type
1731  if (T == atInt) {
1732  TIntV V;
1733  for (TInt i = 0; i < sz; i++) { V.Add(IntCols[AggrColIdx][ValidRows[i]]); }
1734  TInt Res = AggregateVector<TInt>(V, AggOp);
1735  if (AggOp == aaMean) { Res = Res / sz; }
1736  for (TInt i = 0; i < sz; i++) { IntCols[ColIdx][ValidRows[i]] = Res; }
1737  } else {
1738  TFltV V;
1739  for (TInt i = 0; i < sz; i++) { V.Add(FltCols[AggrColIdx][ValidRows[i]]); }
1740  TFlt Res = AggregateVector<TFlt>(V, AggOp);
1741  if (AggOp == aaMean) { Res /= sz; }
1742  for (TInt i = 0; i < sz; i++) { FltCols[ColIdx][ValidRows[i]] = Res; }
1743  }
1744  }
1745  }
1746  // double endIter = omp_get_wtime();
1747  // printf("Iter time = %f\n", endIter-endAdd);
1748 }
1749 
1750 void TTable::AggregateCols(const TStrV& AggrAttrs, TAttrAggr AggOp, const TStr& ResAttr) {
1752  for (TInt i = 0; i < AggrAttrs.Len(); i++) {
1753  Info.Add(GetColTypeMap(AggrAttrs[i]));
1754  if (Info[i].Val1 != Info[0].Val1) {
1755  TExcept::Throw("AggregateCols: Aggregation attributes must have the same type");
1756  }
1757  }
1758 
1759  if (Info[0].Val1 == atInt) {
1760  AddIntCol(ResAttr);
1761  TInt ResIdx = GetColIdx(ResAttr);
1762 
1763  for (TRowIterator RI = BegRI(); RI < EndRI(); RI++) {
1764  TInt RowIdx = RI.GetRowIdx();
1765  TIntV V;
1766  for (TInt i = 0; i < AggrAttrs.Len(); i++) {
1767  V.Add(IntCols[Info[i].Val2][RowIdx]);
1768  }
1769  IntCols[ResIdx][RowIdx] = AggregateVector<TInt>(V, AggOp);
1770  }
1771  } else if (Info[0].Val1 == atFlt) {
1772  AddFltCol(ResAttr);
1773  TInt ResIdx = GetColIdx(ResAttr);
1774 
1775  for (TRowIterator RI = BegRI(); RI < EndRI(); RI++) {
1776  TInt RowIdx = RI.GetRowIdx();
1777  TFltV V;
1778  for (TInt i = 0; i < AggrAttrs.Len(); i++) {
1779  V.Add(FltCols[Info[i].Val2][RowIdx]);
1780  }
1781  FltCols[ResIdx][RowIdx] = AggregateVector<TFlt>(V, AggOp);
1782  }
1783  } else {
1784  TExcept::Throw("AggregateCols: Only Int and Flt aggregation supported right now");
1785  }
1786 }
1787 
1789  for(THash<TGroupKey, TIntV>::TIter it = Mapping.BegI(); it < Mapping.EndI(); it++){
1790  TGroupKey gk = it.GetKey();
1791  TIntV ik = gk.Val1;
1792  TFltV fk = gk.Val2;
1793  for(int i = 0; i < ik.Len(); i++){ printf("%d ",ik[i].Val);}
1794  for(int i = 0; i < fk.Len(); i++){ printf("%f ",fk[i].Val);}
1795  printf("-->");
1796  TIntV v = it.GetDat();
1797  for(int i = 0; i < v.Len(); i++){ printf("%d ",v[i].Val);}
1798  printf("\n");
1799  }
1800 }
1801 
1802 void TTable::Count(const TStr& CountColName, const TStr& Col) {
1803  TStrV GroupByAttrs;
1804  GroupByAttrs.Add(CountColName);
1805  Aggregate(GroupByAttrs, aaCount, "", Col);
1806 }
1807 
1808 TVec<PTable> TTable::SpliceByGroup(const TStrV& GroupBy, TBool Ordered) {
1809  TStrV NGroupBy = NormalizeColNameV(GroupBy);
1810  TIntV UniqueVec;
1812  TVec<PTable> Result;
1813 
1814  Schema NewSchema;
1815  for (TInt c = 0; c < Sch.Len(); c++) {
1816  if (Sch[c].Val1 != GetIdColName()) {
1817  NewSchema.Add(Sch[c]);
1818  }
1819  }
1820 
1821  GroupAux(NGroupBy, Grouping, Ordered, "", false, UniqueVec);
1822 
1823  TInt cnt = 0;
1824  // iterate over groups
1825  for (THash<TGroupKey, TPair<TInt, TIntV> >::TIter it = Grouping.BegI(); it != Grouping.EndI(); it++) {
1826  PTable GroupTable = TTable::New(NewSchema, Context);
1827 
1828  TVec<TPair<TAttrType, TInt> > ColInfo;
1829  TIntV V;
1830  for (TInt i = 0; i < Sch.Len(); i++) {
1831  ColInfo.Add(GroupTable->GetColTypeMap(Sch[i].Val1));
1832  if (Sch[i].Val1 == IdColName()) {
1833  ColInfo[i].Val2 = -1;
1834  }
1835  V.Add(GetColIdx(Sch[i].Val1));
1836  }
1837 
1838  TIntV& Rows = it.GetDat().Val2;
1839 
1840  // iterate over rows in group
1841  for (TInt i = 0; i < Rows.Len(); i++) {
1842  // convert from permanent ID to row ID
1843  TInt RowIdx = RowIdMap.GetDat(Rows[i]);
1844 
1845  // iterate over schema
1846  for (TInt c = 0; c < Sch.Len(); c++) {
1847  TPair<TAttrType, TInt> Info = ColInfo[c];
1848  TInt ColIdx = Info.Val2;
1849 
1850  if (ColIdx == -1) { continue; }
1851 
1852  // add row to new group
1853  switch (Info.Val1) {
1854  case atInt:
1855  GroupTable->IntCols[ColIdx].Add(IntCols[V[c]][RowIdx]);
1856  break;
1857  case atFlt:
1858  GroupTable->FltCols[ColIdx].Add(FltCols[V[c]][RowIdx]);
1859  break;
1860  case atStr:
1861  GroupTable->StrColMaps[ColIdx].Add(StrColMaps[V[c]][RowIdx]);
1862  break;
1863  }
1864 
1865  }
1866  if (GroupTable->LastValidRow >= 0) {
1867  GroupTable->Next[GroupTable->LastValidRow] = GroupTable->NumRows;
1868  }
1869  GroupTable->Next.Add(GroupTable->Last);
1870  GroupTable->LastValidRow = GroupTable->NumRows;
1871 
1872  GroupTable->NumRows++;
1873  GroupTable->NumValidRows++;
1874  }
1875  GroupTable->InitIds();
1876  Result.Add(GroupTable);
1877 
1878  cnt += 1;
1879  }
1880  return Result;
1881 }
1882 
1884  IdColName = "_id";
1885  //Assert(NumRows == NumValidRows);
1887 }
1888 
1890  RowIdMap.Clr();
1891  TInt IdColIdx = GetColIdx(IdColName);
1892  TInt IdCnt = 0;
1893  for (TRowIterator RI = BegRI(); RI < EndRI(); RI++) {
1894  IntCols[IdColIdx][RI.GetRowIdx()] = IdCnt;
1895  RowIdMap.AddDat(RI.GetRowIdx(), IdCnt);
1896  IdCnt++;
1897  }
1898 }
1899 
1900 void TTable::AddIdColumn(const TStr& ColName) {
1901  //printf("NumRows: %d\n", NumRows.Val);
1902  TInt IdCol = IntCols.Add();
1903  IntCols[IdCol].Reserve(NumRows, NumRows);
1904  //printf("IdCol Reserved\n");
1905  TInt IdCnt = 0;
1906  RowIdMap.Clr();
1907  for (TRowIterator RI = BegRI(); RI < EndRI(); RI++) {
1908  IntCols[IdCol][RI.GetRowIdx()] = IdCnt;
1909  RowIdMap.AddDat(IdCnt, RI.GetRowIdx());
1910  IdCnt++;
1911  }
1912  AddSchemaCol(ColName, atInt);
1913  AddColType(ColName, atInt, IntCols.Len()-1);
1914 }
1915 
1917  PTable JointTable = New(Context);
1918  JointTable->IntCols = TVec<TIntV>(IntCols.Len() + Table.IntCols.Len() + 1);
1919  JointTable->FltCols = TVec<TFltV>(FltCols.Len() + Table.FltCols.Len());
1920  JointTable->StrColMaps = TVec<TIntV>(StrColMaps.Len() + Table.StrColMaps.Len());
1921  for (TInt i = 0; i < Sch.Len(); i++) {
1922  TStr ColName = GetSchemaColName(i);
1923  TAttrType ColType = GetSchemaColType(i);
1924  TStr CName = JointTable->RenumberColName(ColName);
1925  TPair<TAttrType, TInt> TypeMap = GetColTypeMap(ColName);
1926  JointTable->AddColType(CName, TypeMap);
1927  //JointTable->AddLabel(CName, ColName);
1928  JointTable->AddSchemaCol(CName, ColType);
1929  }
1930  for (TInt i = 0; i < Table.Sch.Len(); i++) {
1931  TStr ColName = Table.GetSchemaColName(i);
1932  TAttrType ColType = Table.GetSchemaColType(i);
1933  TStr CName = JointTable->RenumberColName(ColName);
1934  TPair<TAttrType, TInt> NewDat = Table.GetColTypeMap(ColName);
1935  Assert(ColType == NewDat.Val1);
1936  // add offsets
1937  switch (NewDat.Val1) {
1938  case atInt:
1939  NewDat.Val2 += IntCols.Len();
1940  break;
1941  case atFlt:
1942  NewDat.Val2 += FltCols.Len();
1943  break;
1944  case atStr:
1945  NewDat.Val2 += StrColMaps.Len();
1946  break;
1947  }
1948  JointTable->AddColType(CName, NewDat);
1949  JointTable->AddSchemaCol(CName, ColType);
1950  }
1951  TStr IdColName = "_id";
1952  JointTable->AddColType(IdColName, atInt, IntCols.Len() + Table.IntCols.Len());
1953  JointTable->AddSchemaCol(IdColName, atInt);
1954  return JointTable;
1955 }
1956 
1957 void TTable::AddJointRow(const TTable& T1, const TTable& T2, TInt RowIdx1, TInt RowIdx2) {
1958  for (TInt i = 0; i < T1.IntCols.Len(); i++) {
1959  IntCols[i].Add(T1.IntCols[i][RowIdx1]);
1960  }
1961  for (TInt i = 0; i < T1.FltCols.Len(); i++) {
1962  FltCols[i].Add(T1.FltCols[i][RowIdx1]);
1963  }
1964  for (TInt i = 0; i < T1.StrColMaps.Len(); i++) {
1965  StrColMaps[i].Add(T1.StrColMaps[i][RowIdx1]);
1966  }
1967  TInt IntOffset = T1.IntCols.Len();
1968  TInt FltOffset = T1.FltCols.Len();
1969  TInt StrOffset = T1.StrColMaps.Len();
1970  for (TInt i = 0; i < T2.IntCols.Len(); i++) {
1971  IntCols[i+IntOffset].Add(T2.IntCols[i][RowIdx2]);
1972  }
1973  for (TInt i = 0; i < T2.FltCols.Len(); i++) {
1974  FltCols[i+FltOffset].Add(T2.FltCols[i][RowIdx2]);
1975  }
1976  for (TInt i = 0; i < T2.StrColMaps.Len(); i++) {
1977  StrColMaps[i+StrOffset].Add(T2.StrColMaps[i][RowIdx2]);
1978  }
1979  TInt IdOffset = IntOffset + T2.IntCols.Len();
1980  NumRows++;
1981  NumValidRows++;
1982  if (!Next.Empty()) {
1983  Next[Next.Len()-1] = NumValidRows-1;
1985  }
1986  Next.Add(Last);
1988  IntCols[IdOffset].Add(NumRows-1);
1989 }
1990 
1994 PTable TTable::SimJoin(const TStrV& Cols1, const TTable& Table, const TStrV& Cols2, const TStr& DistanceColName, const TSimType& SimType, const TFlt& Threshold)
1995 {
1996  Assert(Cols1.Len() == Cols2.Len());
1997 
1998  if(Cols1.Len()!=Cols2.Len()){
1999  TExcept::Throw("Column vectors must match in type and length");
2000  }
2001 
2002  for (TInt i = 0; i < Cols1.Len(); i++) {
2003  if(!IsColName(Cols1[i]) || !Table.IsColName(Cols2[i])){
2004  TExcept::Throw("Column not found in Table");
2005  }
2006 
2007  TAttrType Type1 = GetColType(Cols1[i]);
2008  TAttrType Type2 = GetColType(Cols2[i]);
2009 
2010  if(Type1!=Type2){
2011  TExcept::Throw("Column types on the two tables must match.");
2012  }
2013 
2014  // When supporting more distance metrics, check if the types are supported for given metric.
2015  if((Type1!=atInt && Type1!=atFlt) || (Type2!=atInt && Type2!=atFlt)){
2016  TExcept::Throw("Column type not supported. Only Flt and Int column types are supported.");
2017  }
2018  }
2019 
2020  // Initialize Join table and add the similarity column
2021  PTable JointTable = InitializeJointTable(Table);
2022  TFltV DistanceV;
2023 
2024  // O(n^2): Parallelize
2025  for(TRowIterator RowI = this->BegRI(); RowI < this->EndRI(); RowI++) {
2026  for(TRowIterator RowI2 = Table.BegRI(); RowI2 < Table.EndRI(); RowI2++) {
2027  float distance = 0;
2028 
2029  switch(SimType)
2030  {
2031  // Calculate the distance metric
2032  case L2Norm:
2033  for(TInt i = 0; i < Cols1.Len(); i++) {
2034  float attrVal1, attrVal2;
2035  attrVal1 = GetColType(Cols1[i])==atInt ? (float)RowI.GetIntAttr(Cols1[i]) : (float)RowI.GetFltAttr(Cols1[i]);
2036  attrVal2 = Table.GetColType(Cols2[i])==atInt ? (float)RowI2.GetIntAttr(Cols2[i]) : (float)RowI2.GetFltAttr(Cols2[i]);
2037  distance += pow(attrVal1 - attrVal2, 2);
2038  }
2039 
2040  distance = sqrt(distance);
2041 
2042  if(distance<=Threshold){
2043  JointTable->AddJointRow(*this, Table, RowI.GetRowIdx(), RowI2.GetRowIdx());
2044  DistanceV.Add(distance);
2045  }
2046 
2047  // Add row to the joint table if distance <= Threshold
2048  break;
2049  // Haversine distance to calculate the distance between two points on Earth from latitude/longitude
2050  case Haversine:
2051  {
2052  if(Cols1.Len()!=2){
2053  TExcept::Throw("Haversine disance expects exactly two attributes - latitude and longitude - in that order.");
2054  }
2055 
2056  // Block to prevent cross-initialization error from compiler
2057  TFlt Radius = 6373; // km
2058  float Latitude1 = GetColType(Cols1[0])==atInt ? (float)RowI.GetIntAttr(Cols1[0]) : (float)RowI.GetFltAttr(Cols1[0]);
2059  float Latitude2 = Table.GetColType(Cols2[0])==atInt ? (float)RowI2.GetIntAttr(Cols2[0]) : (float)RowI2.GetFltAttr(Cols2[0]);
2060 
2061  float Longitude1 = GetColType(Cols1[1])==atInt ? (float)RowI.GetIntAttr(Cols1[1]) : (float)RowI.GetFltAttr(Cols1[1]);
2062  float Longitude2 = Table.GetColType(Cols2[1])==atInt ? (float)RowI2.GetIntAttr(Cols2[1]) : (float)RowI2.GetFltAttr(Cols2[1]);
2063 
2064  Latitude1 *= static_cast<float>(M_PI/180.0);
2065  Latitude2 *= static_cast<float>(M_PI/180.0);
2066  Longitude1 *= static_cast<float>(M_PI/180.0);
2067  Longitude2 *= static_cast<float>(M_PI/180.0);
2068 
2069  float dlon = Longitude2 - Longitude1;
2070  float dlat = Latitude2 - Latitude1;
2071  float a = pow(sin(dlat/2), 2) + cos(Latitude1)*cos(Latitude2)*pow(sin(dlon/2), 2);
2072  float c = 2*atan2(sqrt(a), sqrt(1-a));
2073  distance = (static_cast<float>(Radius.Val))*c;
2074 
2075  if(distance<=Threshold){
2076  JointTable->AddJointRow(*this, Table, RowI.GetRowIdx(), RowI2.GetRowIdx());
2077  DistanceV.Add(distance);
2078  }
2079  }
2080  break;
2081  case L1Norm:
2082  case Jaccard:
2083  TExcept::Throw("This distance metric is not supported");
2084  }
2085  }
2086  }
2087 
2088  // Add the value for the similarity column
2089  JointTable->StoreFltCol(DistanceColName, DistanceV);
2090  JointTable->InitIds();
2091  return JointTable;
2092 }
2093 
2094 PTable TTable::SelfSimJoinPerGroup(const TStr& GroupAttr, const TStr& SimCol, const TStr& DistanceColName, const TSimType& SimType, const TFlt& Threshold)
2095 {
2096  if(!IsColName(SimCol) || !IsColName(GroupAttr)){
2097  TExcept::Throw("No such column found in table");
2098  }
2099 
2100  PTable JointTable = New(Context);
2101  // Initialize the joint table - (GroupId1, GroupId2, Similarity)
2102  JointTable->IntCols = TVec<TIntV>(2);
2103  JointTable->FltCols = TVec<TFltV>(1);
2104 
2105  for(TInt i=0;i<2;i++){
2106  TInt Suffix = i+1;
2107  TStr CName = "GroupId_" + Suffix.GetStr();
2109  JointTable->AddColType(CName, Group);
2110  JointTable->AddSchemaCol(CName, atInt);
2111  }
2112 
2114  JointTable->AddColType(DistanceColName, Group);
2115  JointTable->AddSchemaCol(DistanceColName, atFlt);
2116 
2118 
2119  TAttrType attrType = GetColType(SimCol);
2120  TInt GroupColIdx = GetColIdx(GroupAttr);
2121  TInt SimColIdx = GetColIdx(SimCol);
2122 
2123  for (TRowIterator RowI = this->BegRI(); RowI < this->EndRI(); RowI++) {
2124  TInt GroupId = IntCols[GroupColIdx][RowI.GetRowIdx()];
2125 
2126  if(attrType==atInt || attrType==atStr)
2127  {
2128  if(!TIntHH.IsKey(GroupId)){
2130  TIntHH.AddDat(GroupId, TIntH);
2131  }
2132 
2133  THash<TInt, TInt>& TIntH = TIntHH.GetDat(GroupId);
2134  TInt SimAttrVal = (attrType==atInt ? IntCols[SimColIdx][RowI.GetRowIdx()] : StrColMaps[SimColIdx][RowI.GetRowIdx()]);
2135  TIntH.AddDat(SimAttrVal, 0);
2136  }
2137  else
2138  {
2139  TExcept::Throw("Attribute type not supported.");
2140  }
2141  }
2142 
2143  // Iterate through every pair of groups and calculate the distance
2144  for (THash<TInt, THash<TInt, TInt> >::TIter it1 = TIntHH.BegI(); it1 < TIntHH.EndI(); it1++) {
2145  THash<TInt, TInt> Vals1H = it1.GetDat();
2146  TInt GroupId1 = it1.GetKey();
2147 
2148  for (THash<TInt, THash<TInt, TInt> >::TIter it2 = TIntHH.BegI(); it2 < TIntHH.EndI(); it2++) {
2149  int intersectionCount = 0;
2150  TInt GroupId2 = it2.GetKey();
2151  THash<TInt, TInt> Vals2H = it2.GetDat();
2152 
2153  for(THash<TInt, TInt>::TIter it = Vals1H.BegI(); it < Vals1H.EndI(); it++)
2154  {
2155  TInt Val = it.GetKey();
2156  if(Vals2H.IsKey(Val)){
2157  intersectionCount+=1;
2158  }
2159  }
2160 
2161  int unionCount = Vals1H.Len() + Vals2H.Len() - intersectionCount;
2162  float distance = 1.0f - (float)intersectionCount/unionCount;
2163 
2164  // Add a new row to the JointTable
2165  if(distance<=Threshold){
2166  JointTable->IntCols[0].Add(GroupId1);
2167  JointTable->IntCols[1].Add(GroupId2);
2168  JointTable->FltCols[0].Add(distance);
2169  JointTable->IncrementNext();
2170  }
2171  }
2172  }
2173 
2174  JointTable->InitIds();
2175  return JointTable;
2176 }
2177 
2180 PTable TTable::SelfSimJoinPerGroup(const TStrV& GroupBy, const TStr& SimCol,
2181  const TStr& DistanceColName, const TSimType& SimType, const TFlt& Threshold) {
2182  TStrV NGroupBy = NormalizeColNameV(GroupBy);
2183  TStrV ProjectionV;
2184 
2185  // Only keep the GroupBy cols and the SimCol
2186  for(TInt i=0; i<GroupBy.Len(); i++)
2187  {
2188  ProjectionV.Add(GroupBy[i]);
2189  }
2190 
2191  ProjectionV.Add(SimCol);
2192  ProjectInPlace(ProjectionV);
2193 
2194  TStr CName = "Group";
2195  TIntV UniqueVec;
2197  GroupAux(NGroupBy, Grouping, false, CName, false, UniqueVec);
2198  PTable GroupJointTable = SelfSimJoinPerGroup(CName, SimCol, DistanceColName, SimType, Threshold);
2199  PTable JointTable = InitializeJointTable(*this);
2200 
2201  // Hash of groupid to any arbitrary row of that group. Arbitrary because the GroupBy
2202  // columns within that group are the same, so we can choose any one.
2203  THash<TInt, TInt> GroupIdH;
2204 
2205  for(THash<TGroupKey, TPair<TInt, TIntV> >::TIter it=Grouping.BegI(); it<Grouping.EndI(); it++)
2206  {
2207  TPair<TInt, TIntV> group = it.GetDat();
2208  TInt GroupNum = group.Val1;
2209  TIntV RowIds = group.Val2;
2210 
2211  if(!GroupIdH.IsKey(GroupNum))
2212  {
2213  TInt RandomRowId = RowIds[0]; // Arbitrarily select the 1st row.
2214  GroupIdH.AddDat(GroupNum, RandomRowId);
2215  }
2216  }
2217 
2218  for(TRowIterator RowI = GroupJointTable->BegRI(); RowI < GroupJointTable->EndRI(); RowI++)
2219  {
2220  // The GroupJoinTable has a well defined structure - columns 0 and 1 are GroupIds
2221  TInt GroupId1 = GroupJointTable->IntCols[0][RowI.GetRowIdx()];
2222  TInt GroupId2 = GroupJointTable->IntCols[1][RowI.GetRowIdx()];
2223 
2224  // Get the rows for groupid1 and groupid and arbitrary select one row
2225  TInt RowId1 = GroupIdH.GetDat(GroupId1);
2226  TInt RowId2 = GroupIdH.GetDat(GroupId2);
2227  JointTable->AddJointRow(*this, *this, RowId1, RowId2);
2228  }
2229 
2230  // Add the simiarlity column from the GroupJointTable - GroupJointTable has a
2231  // well defined structure - The first float column is the similarity;
2232  JointTable->StoreFltCol(DistanceColName, GroupJointTable->FltCols[0]);
2233  ProjectionV.Clr();
2234  ProjectionV.Add(DistanceColName);
2235 
2236  // Find the GroupBy columns in the JointTable by matching the Suffix of the Schema
2237  // columns with the original GroupBy columns - Note that Join renames columns.
2238  for(TInt i=0; i<GroupBy.Len(); i++){
2239  for(TInt j=0; j<JointTable->Sch.Len(); j++)
2240  {
2241  TStr ColName = JointTable->Sch[j].Val1;
2242  if(ColName.IsStrIn(GroupBy[i]))
2243  {
2244  ProjectionV.Add(ColName);
2245  }
2246  }
2247  }
2248 
2249  JointTable->ProjectInPlace(ProjectionV);
2250  JointTable->InitIds();
2251  return JointTable;
2252 }
2253 
2254 // Increments the next vector and set last, NumRows and NumValidRows.
2256 {
2257  // Advance the Next vector
2258  NumRows++;
2259  NumValidRows++;
2260  if (!Next.Empty()) {
2261  Next[Next.Len()-1] = NumValidRows-1;
2263  }
2264  Next.Add(Last);
2265 }
2266 
2267 // Q: Do we want to have any gurantees in terms of order of the 0t rows - i.e.
2268 // ordered by "this" table row idx as primary key and "Table" row idx as secondary key
2269  // This means only keeping joint row indices (pairs of original row indices), sorting them
2270  // and adding all rows in the end. Sorting can be expensive, but we would be able to pre-allocate
2271  // memory for the joint table..
2272 PTable TTable::Join(const TStr& Col1, const TTable& Table, const TStr& Col2) {
2273  // double startFn = omp_get_wtime();
2274  if (!IsColName(Col1)) {
2275  TExcept::Throw("no such column " + Col1);
2276  printf("no such column %s\n", Col1.CStr());
2277  }
2278  if (!Table.IsColName(Col2)) {
2279  TExcept::Throw("no such column " + Col2);
2280  printf("no such column %s\n", Col2.CStr());
2281  }
2282  if (GetColType(Col1) != Table.GetColType(Col2)) {
2283  TExcept::Throw("Trying to Join on columns of different type");
2284  printf("Trying to Join on columns of different type\n");
2285  }
2286  //printf("passed initial checks\n");
2287  // initialize result table
2288  PTable JointTable = InitializeJointTable(Table);
2289  //printf("initialized joint table\n");
2290  // hash smaller table (group by column)
2291  TAttrType ColType = GetColType(Col1);
2292  TBool ThisIsSmaller = (NumValidRows <= Table.NumValidRows);
2293  const TTable& TS = ThisIsSmaller ? *this : Table;
2294  const TTable& TB = ThisIsSmaller ? Table : *this;
2295  TStr ColS = ThisIsSmaller ? Col1 : Col2;
2296  TStr ColB = ThisIsSmaller ? Col2 : Col1;
2297  TInt ColBId = ThisIsSmaller ? Table.GetColIdx(ColB) : GetColIdx(ColB);
2298  // double endInit = omp_get_wtime();
2299  // printf("Init time = %f\n", endInit-startFn);
2300  // iterate over the rows of the bigger table and check for "collisions"
2301  // with the group keys for the small table.
2302 #ifdef GCC_ATOMIC
2303  if (GetMP()) {
2304  switch(ColType){
2305  case atInt:{
2307  TS.GroupByIntColMP(ColS, T, true);
2308  // double endGroup = omp_get_wtime();
2309  // printf("Group time = %f\n", endGroup-endInit);
2310 
2311  TIntPrV Partitions;
2312  TB.GetPartitionRanges(Partitions, omp_get_max_threads()*CHUNKS_PER_THREAD);
2313  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
2314  TVec<TIntPrV> JointRowIDSet(Partitions.Len());
2315  // double endPart = omp_get_wtime();
2316  // printf("Partition time = %f\n", endPart-endGroup);
2317 
2318  #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD)
2319  for (int i = 0; i < Partitions.Len(); i++){
2320  //double start = omp_get_wtime();
2321  JointRowIDSet[i].Reserve(PartitionSize);
2322  TRowIterator RowI(Partitions[i].GetVal1(), &TB);
2323  TRowIterator EndI(Partitions[i].GetVal2(), &TB);
2324  while (RowI < EndI) {
2325  TInt K = RowI.GetIntAttr(ColBId);
2326  if(T.IsKey(K)){
2327  TIntV& Group = T.GetDat(K);
2328  for(TInt j = 0; j < Group.Len(); j++){
2329  if(ThisIsSmaller){
2330  JointRowIDSet[i].Add(TIntPr(Group[j], RowI.GetRowIdx()));
2331  } else{
2332  JointRowIDSet[i].Add(TIntPr(RowI.GetRowIdx(), Group[j]));
2333  }
2334  }
2335  }
2336  RowI++;
2337  }
2338  //double end = omp_get_wtime();
2339  //printf("END: Thread %d: i = %d, start = %d, end = %d, num = %d, time = %f\n", omp_get_thread_num(), i,
2340  // Partitions[i].GetVal1().Val, Partitions[i].GetVal2().Val, JointRowIDSet[i].Len(), end-start);
2341  }
2342  // double endJoin = omp_get_wtime();
2343  // printf("Iterate time = %f\n", endJoin-endPart);
2344  JointTable->AddNJointRowsMP(*this, Table, JointRowIDSet);
2345  // double endAdd = omp_get_wtime();
2346  // printf("Add time = %f\n", endAdd-endJoin);
2347  break;
2348  }
2349  case atFlt:{
2351  TS.GroupByFltCol(ColS, T, TIntV(), true);
2352 
2353  TIntPrV Partitions;
2354  TB.GetPartitionRanges(Partitions, omp_get_max_threads()*CHUNKS_PER_THREAD);
2355  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
2356  TVec<TIntPrV> JointRowIDSet(Partitions.Len());
2357 
2358  #pragma omp parallel for schedule(dynamic)
2359  for (int i = 0; i < Partitions.Len(); i++){
2360  JointRowIDSet[i].Reserve(PartitionSize);
2361  TRowIterator RowI(Partitions[i].GetVal1(), &TB);
2362  TRowIterator EndI(Partitions[i].GetVal2(), &TB);
2363  while (RowI < EndI) {
2364  TFlt K = RowI.GetFltAttr(ColBId);
2365  if(T.IsKey(K)){
2366  TIntV& Group = T.GetDat(K);
2367  for(TInt j = 0; j < Group.Len(); j++){
2368  if(ThisIsSmaller){
2369  JointRowIDSet[i].Add(TIntPr(Group[j], RowI.GetRowIdx()));
2370  } else{
2371  JointRowIDSet[i].Add(TIntPr(RowI.GetRowIdx(), Group[j]));
2372  }
2373  }
2374  }
2375  RowI++;
2376  }
2377  }
2378  JointTable->AddNJointRowsMP(*this, Table, JointRowIDSet);
2379  break;
2380  }
2381  case atStr:{
2383  TS.GroupByStrCol(ColS, T, TIntV(), true);
2384 
2385  TIntPrV Partitions;
2386  TB.GetPartitionRanges(Partitions, omp_get_max_threads()*CHUNKS_PER_THREAD);
2387  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
2388  TVec<TIntPrV> JointRowIDSet(Partitions.Len());
2389 
2390  #pragma omp parallel for schedule(dynamic)
2391  for (int i = 0; i < Partitions.Len(); i++){
2392  JointRowIDSet[i].Reserve(PartitionSize);
2393  TRowIterator RowI(Partitions[i].GetVal1(), &TB);
2394  TRowIterator EndI(Partitions[i].GetVal2(), &TB);
2395  while (RowI < EndI) {
2396  TInt K = RowI.GetStrMapById(ColBId);
2397  if(T.IsKey(K)){
2398  TIntV& Group = T.GetDat(K);
2399  for(TInt j = 0; j < Group.Len(); j++){
2400  if(ThisIsSmaller){
2401  JointRowIDSet[i].Add(TIntPr(Group[j], RowI.GetRowIdx()));
2402  } else{
2403  JointRowIDSet[i].Add(TIntPr(RowI.GetRowIdx(), Group[j]));
2404  }
2405  }
2406  }
2407  RowI++;
2408  }
2409  }
2410  JointTable->AddNJointRowsMP(*this, Table, JointRowIDSet);
2411  }
2412  break;
2413  }
2414  } else {
2415 #endif // GCC_ATOMIC
2416  switch (ColType) {
2417  case atInt:{
2418  TIntIntVH T;
2419  TS.GroupByIntCol(ColS, T, TIntV(), true);
2420  for (TRowIterator RowI = TB.BegRI(); RowI < TB.EndRI(); RowI++) {
2421  TInt K = RowI.GetIntAttr(ColBId);
2422  if (T.IsKey(K)) {
2423  TIntV& Group = T.GetDat(K);
2424  for (TInt i = 0; i < Group.Len(); i++) {
2425  if (ThisIsSmaller) {
2426  JointTable->AddJointRow(*this, Table, Group[i], RowI.GetRowIdx());
2427  } else {
2428  JointTable->AddJointRow(*this, Table, RowI.GetRowIdx(), Group[i]);
2429  }
2430  }
2431  }
2432  }
2433  break;
2434  }
2435  case atFlt:{
2437  TS.GroupByFltCol(ColS, T, TIntV(), true);
2438  for (TRowIterator RowI = TB.BegRI(); RowI < TB.EndRI(); RowI++) {
2439  TFlt K = RowI.GetFltAttr(ColBId);
2440  if (T.IsKey(K)) {
2441  TIntV& Group = T.GetDat(K);
2442  for (TInt i = 0; i < Group.Len(); i++) {
2443  if (ThisIsSmaller) {
2444  JointTable->AddJointRow(*this, Table, Group[i], RowI.GetRowIdx());
2445  } else {
2446  JointTable->AddJointRow(*this, Table, RowI.GetRowIdx(), Group[i]);
2447  }
2448  }
2449  }
2450  }
2451  break;
2452  }
2453  case atStr:{
2454  TIntIntVH T;
2455  TS.GroupByStrCol(ColS, T, TIntV(), true);
2456  for (TRowIterator RowI = TB.BegRI(); RowI < TB.EndRI(); RowI++) {
2457  TInt K = RowI.GetStrMapById(ColBId);
2458  if (T.IsKey(K)) {
2459  TIntV& Group = T.GetDat(K);
2460  for (TInt i = 0; i < Group.Len(); i++) {
2461  if (ThisIsSmaller) {
2462  JointTable->AddJointRow(*this, Table, Group[i], RowI.GetRowIdx());
2463  } else {
2464  JointTable->AddJointRow(*this, Table, RowI.GetRowIdx(), Group[i]);
2465  }
2466  }
2467  }
2468  }
2469  }
2470  break;
2471  }
2472 #ifdef GCC_ATOMIC
2473  }
2474 #endif
2475  return JointTable;
2476 }
2477 
2478 void TTable::ThresholdJoinInputCorrectness(const TStr& KeyCol1, const TStr& JoinCol1, const TTable& Table,
2479  const TStr& KeyCol2, const TStr& JoinCol2){
2480  if (!IsColName(KeyCol1)) {
2481  printf("no such column %s\n", KeyCol1.CStr());
2482  TExcept::Throw("no such column " + KeyCol1);
2483  }
2484  if (!Table.IsColName(KeyCol2)) {
2485  printf("no such column %s\n", KeyCol2.CStr());
2486  TExcept::Throw("no such column " + KeyCol2);
2487  }
2488  if (!IsColName(JoinCol1)) {
2489  printf("no such column %s\n", JoinCol1.CStr());
2490  TExcept::Throw("no such column " + JoinCol1);
2491  }
2492  if (!Table.IsColName(JoinCol2)) {
2493  printf("no such column %s\n", JoinCol2.CStr());
2494  TExcept::Throw("no such column " + JoinCol2);
2495  }
2496  if (GetColType(JoinCol1) != Table.GetColType(JoinCol2)) {
2497  printf("Trying to Join on columns of different type\n");
2498  TExcept::Throw("Trying to Join on columns of different type");
2499  }
2500  if (GetColType(KeyCol1) != Table.GetColType(KeyCol2)) {
2501  printf("Key type mismatch\n");
2502  TExcept::Throw("Key type mismatch");
2503  }
2504 }
2505 
2507  const TIntIntVH& T, TInt JoinColIdxB, TInt KeyColIdxB, TInt KeyColIdxS,
2508  THash<TIntPr,TIntTr>& Counters, TBool ThisIsSmaller, TAttrType JoinColType, TAttrType KeyType){
2509  // iterate over big table and count / record joint tuples
2510  for (TRowIterator RowI = TB.BegRI(); RowI < TB.EndRI(); RowI++) {
2511  // value to join on from big table
2512  TInt JVal = 0;
2513  if(JoinColType == atStr){
2514  JVal = RowI.GetStrMapById(JoinColIdxB);
2515  } else{
2516  JVal = RowI.GetIntAttr(JoinColIdxB);
2517  }
2518  //printf("JVal: %d\n", JVal.Val);
2519  if(T.IsKey(JVal)){
2520  // read key attribute of big table row
2521  TInt KeyB = 0;
2522  if(KeyType == atStr){
2523  KeyB = RowI.GetStrMapById(KeyColIdxB);
2524  } else{
2525  KeyB = RowI.GetIntAttr(KeyColIdxB);
2526  }
2527  // read row ids from small table with join attribute value of JVal
2528  const TIntV& RelevantRows = T.GetDat(JVal);
2529  for(int i = 0; i < RelevantRows.Len(); i++){
2530  // read key attribute of relevant row from small table
2531  TInt KeyS = 0;
2532  if(KeyType == atStr){
2533  KeyS = TS.StrColMaps[KeyColIdxS][RelevantRows[i]];
2534  } else{
2535  KeyS = TS.IntCols[KeyColIdxS][RelevantRows[i]];
2536  }
2537  // create a pair of keys - serves as a key in Counters
2538  TIntPr Keys = ThisIsSmaller ? TIntPr(KeyS, KeyB) : TIntPr(KeyB, KeyS);
2539  if(Counters.IsKey(Keys)){
2540  // if the key pair has been seen before - increment its counter by 1
2541  TIntTr& V = Counters.GetDat(Keys);
2542  V.Val3 = V.Val3 + 1;
2543  } else{
2544  // if the key pair hasn't been seen before - add it with value of
2545  // row indices that create a joint record with this key pair
2546  if(ThisIsSmaller){
2547  Counters.AddDat(Keys, TIntTr(RelevantRows[i], RowI.GetRowIdx(),1));
2548  } else{
2549  Counters.AddDat(Keys, TIntTr(RowI.GetRowIdx(), RelevantRows[i],1));
2550  }
2551  }
2552  } // end of for loop
2553  } // end of if statement
2554  } // end of for loop
2555 }
2556 
2558  const TIntIntVH& T, TInt JoinColIdxB, TInt KeyColIdxB, TInt KeyColIdxS,
2559  THash<TIntTr,TIntTr>& Counters, TBool ThisIsSmaller, TAttrType JoinColType, TAttrType KeyType){
2560  for (TRowIterator RowI = TB.BegRI(); RowI < TB.EndRI(); RowI++) {
2561  // value to join on from big table
2562  TInt JVal = 0;
2563  if(JoinColType == atStr){
2564  JVal = RowI.GetStrMapById(JoinColIdxB);
2565  } else{
2566  JVal = RowI.GetIntAttr(JoinColIdxB);
2567  }
2568  //printf("JVal: %d\n", JVal.Val);
2569  if(T.IsKey(JVal)){
2570  // read key attribute of big table row
2571  TInt KeyB = 0;
2572  if(KeyType == atStr){
2573  KeyB = RowI.GetStrMapById(KeyColIdxB);
2574  } else{
2575  KeyB = RowI.GetIntAttr(KeyColIdxB);
2576  }
2577  // read row ids from small table with join attribute value of JVal
2578  const TIntV& RelevantRows = T.GetDat(JVal);
2579  for(int i = 0; i < RelevantRows.Len(); i++){
2580  // read key attribute of relevant row from small table
2581  TInt KeyS = 0;
2582  if(KeyType == atStr){
2583  KeyS = TS.StrColMaps[KeyColIdxS][RelevantRows[i]];
2584  } else{
2585  KeyS = TS.IntCols[KeyColIdxS][RelevantRows[i]];
2586  }
2587  // create a pair of keys - serves as a key in Counters
2588  TIntPr Keys = ThisIsSmaller ? TIntPr(KeyS, KeyB) : TIntPr(KeyB, KeyS);
2589  TIntTr K(Keys.Val1,Keys.Val2,JVal);
2590  if(Counters.IsKey(K)){
2591  // if the key pair has been seen before - increment its counter by 1
2592  TIntTr& V = Counters.GetDat(K);
2593  V.Val3 = V.Val3 + 1;
2594  } else{
2595  // if the key pair hasn't been seen before - add it with value of
2596  // row indices that create a joint record with this key pair
2597  if(ThisIsSmaller){
2598  Counters.AddDat(K, TIntTr(RelevantRows[i], RowI.GetRowIdx(),1));
2599  } else{
2600  Counters.AddDat(K, TIntTr(RowI.GetRowIdx(), RelevantRows[i],1));
2601  }
2602  }
2603  } // end of for loop
2604  } // end of if statement
2605  } // end of for loop
2606  }
2607 
2608 PTable TTable::ThresholdJoinOutputTable(const THash<TIntPr,TIntTr>& Counters, TInt Threshold, const TTable& Table){
2609  // initialize result table
2610  PTable JointTable = InitializeJointTable(Table);
2611  for(THash<TIntPr,TIntTr>::TIter iter = Counters.BegI(); iter < Counters.EndI(); iter++){
2612  TIntTr& Counter = iter.GetDat();
2613  //printf("keys: %d, %d\n", iter.GetKey().Val1.Val, iter.GetKey().Val2.Val);
2614  //printf("selected rows: %d,%d, counter: %d\n", Counter.Val1.Val, Counter.Val2.Val, Counter.Val3.Val);
2615  if(Counter.Val3 >= Threshold){
2616  JointTable->AddJointRow(*this, Table, Counter.Val1, Counter.Val2);
2617  }
2618  }
2619  return JointTable;
2620 }
2621 
2623  PTable JointTable = InitializeJointTable(Table);
2624  for(THash<TIntTr,TIntTr>::TIter iter = Counters.BegI(); iter < Counters.EndI(); iter++){
2625  const TIntTr& Counter = iter.GetDat();
2626  const TIntTr& Keys = iter.GetKey();
2627  THashSet<TIntPr> Pairs;
2628  if(Counter.Val3 >= Threshold){
2629  TIntPr K(Keys.Val1,Keys.Val2);
2630  if(!Pairs.IsKey(K)){
2631  Pairs.AddKey(K);
2632  JointTable->AddJointRow(*this, Table, Counter.Val1, Counter.Val2);
2633  }
2634  }
2635  }
2636  return JointTable;
2637 }
2638 
2639 
2640 // expected output: one joint tuple (R1,R2) with:
2641 // (1) R1[KeyCol1] = K1 and R2[KeyCol2] = K2
2642 // for every pair of keys (K1,K2) such that the number of joint tuples
2643 // (joint on R1[JoinCol1] = R2[JointCol2]) that hold property (1) is at least Threshold
2644 PTable TTable::ThresholdJoin(const TStr& KeyCol1, const TStr& JoinCol1, const TTable& Table,
2645  const TStr& KeyCol2, const TStr& JoinCol2, TInt Threshold, TBool PerJoinKey){
2646  // test input correctness
2647  ThresholdJoinInputCorrectness(KeyCol1, JoinCol1, Table, KeyCol2, JoinCol2);
2648  //printf("verified input correctness\n");
2649  // type of column on which we join (currently support only int)
2650  TAttrType JoinColType = GetColType(JoinCol1);
2651  // type of key column (currently support only int)
2652  TAttrType KeyType = GetColType(KeyCol1);
2653  // Determine which table is smaller
2654  TBool ThisIsSmaller = (NumValidRows <= Table.NumValidRows);
2655  const TTable& TS = ThisIsSmaller ? *this : Table;
2656  const TTable& TB = ThisIsSmaller ? Table : *this;
2657  TStr JoinColS = JoinCol1;
2658  TInt JoinColIdxB = GetColIdx(JoinCol2);
2659  TInt KeyColIdxS = GetColIdx(KeyCol1);
2660  TInt KeyColIdxB = GetColIdx(KeyCol2);
2661  if(!ThisIsSmaller){
2662  JoinColS = JoinCol2;
2663  JoinColIdxB = GetColIdx(JoinCol1);
2664  KeyColIdxS = GetColIdx(KeyCol2);
2665  KeyColIdxB = GetColIdx(KeyCol1);
2666  }
2667 
2668  // debug print
2669  //printf("JoinColS = %d, JoinColIdxB = %d, KeyColIdxS = %d, KeyColIdxB = %d\n",
2670  //GetColIdx(JoinColS).Val, JoinColIdxB.Val, KeyColIdxS.Val, KeyColIdxB.Val);
2671  //printf("starting switch-case\n");
2672 
2673  if(KeyType != atInt && KeyType != atStr){
2674  printf("ThresholdJoin only supports integer or string key attributes\n");
2675  TExcept::Throw("ThresholdJoin only supports integer or string key attributes");
2676  }
2677  if(JoinColType != atInt && JoinColType != atStr){
2678  printf("ThresholdJoin only supports integer or string join attributes\n");
2679  TExcept::Throw("ThresholdJoin only supports integer or string join attributes");
2680  }
2681  //printf("starting the real stuff!\n");
2682  // hash the smaller table T: join col value --> physical row ids of rows with that value
2683  TIntIntVH T;
2684  if(JoinColType == atInt){
2685  TS.GroupByIntCol(JoinColS, T, TIntV(), true);
2686  } else if(JoinColType == atStr){
2687  TS.GroupByStrCol(JoinColS, T, TIntV(), true);
2688  } else{
2689  TExcept::Throw("ThresholdJoin only supports integer or string join attributes");
2690  }
2691 
2692  /*
2693  for(THash<TInt,TIntV>::TIter it = T.BegI(); it < T.EndI(); it++){
2694  if(JoinColType == atStr){
2695  printf("%s -->", Context.StringVals.GetKey(it.GetKey().Val));
2696  } else{
2697  printf("%d -->", it.GetKey().Val);
2698  }
2699  const TIntV& V = it.GetDat();
2700  for(int sr = 0; sr < V.Len(); sr++){
2701  printf(" %d", V[sr].Val);
2702  }
2703  printf("\n");
2704  }
2705  */
2706 
2707  // Counters: (K1,K2) --> (RowIdx1,RowIdx2, count) where K1 is a key from KeyCol1,
2708  // K2 is a key from Table's KeyCol2; RowIdx1 and RowIdx2 are physical row ids
2709  // that participates in a joint tuple that satisfies (1).
2710  // count is the count of joint records that satisfy (1).
2711  // In case of string attributes - the integer mappings of the key attribute values are used.
2712  if(PerJoinKey){
2713  //printf("PerJoinKey\n");
2714  THash<TIntTr,TIntTr> Counters;
2715  ThresholdJoinCountPerJoinKeyCollisions(TB, TS, T, JoinColIdxB, KeyColIdxB, KeyColIdxS, Counters, ThisIsSmaller, JoinColType, KeyType);
2716  /*
2717  for(THash<TIntTr,TIntTr>::TIter it = Counters.BegI(); it < Counters.EndI(); it++){
2718  const TIntTr& K = it.GetKey();
2719  const TIntTr& V = it.GetDat();
2720  if(KeyType == atStr){
2721  printf("%s %s --> %d %d %d\n", Context->StringVals.GetKey(K.Val1), Context->StringVals.GetKey(K.Val2), V.Val1.Val, V.Val2.Val, V.Val3.Val);
2722  } else{
2723  printf("%d %d --> %d %d %d\n", K.Val1.Val, K.Val2.Val, V.Val1.Val, V.Val2.Val, V.Val3.Val);
2724  }
2725  }
2726  */
2727  //printf("found collisions\n");
2728  return ThresholdJoinPerJoinKeyOutputTable(Counters, Threshold, Table);
2729  } else{
2730  //printf("not PerJoinKey\n");
2731  THash<TIntPr,TIntTr> Counters;
2732  ThresholdJoinCountCollisions(TB, TS, T, JoinColIdxB, KeyColIdxB, KeyColIdxS, Counters, ThisIsSmaller, JoinColType, KeyType);
2733  /*
2734  for(THash<TIntPr,TIntTr>::TIter it = Counters.BegI(); it < Counters.EndI(); it++){
2735  const TIntPr& K = it.GetKey();
2736  const TIntTr& V = it.GetDat();
2737  if(KeyType == atStr){
2738  printf("%s %s --> %d %d %d\n", Context->StringVals.GetKey(K.Val1), Context->StringVals.GetKey(K.Val2), V.Val1.Val, V.Val2.Val, V.Val3.Val);
2739  } else{
2740  printf("%d %d --> %d %d %d\n", K.Val1.Val, K.Val2.Val, V.Val1.Val, V.Val2.Val, V.Val3.Val);
2741  }
2742  }
2743  */
2744  //printf("found collisions\n");
2745  return ThresholdJoinOutputTable(Counters, Threshold, Table);
2746  }
2747 }
2748 
2749 
2750 void TTable::Select(TPredicate& Predicate, TIntV& SelectedRows, TBool Remove) {
2751  TIntV Selected;
2752  TStrV RelevantCols;
2753  Predicate.GetVariables(RelevantCols);
2754  TInt NumRelevantCols = RelevantCols.Len();
2755  TVec<TAttrType> ColTypes = TVec<TAttrType>(NumRelevantCols);
2756  TIntV ColIndices = TIntV(NumRelevantCols);
2757  for (TInt i = 0; i < NumRelevantCols; i++) {
2758  ColTypes[i] = GetColType(RelevantCols[i]);
2759  ColIndices[i] = GetColIdx(RelevantCols[i]);
2760  }
2761 
2762  if (Remove) {
2764  while (RowI.GetNextRowIdx() != Last) {
2765  // prepare arguments for predicate evaluation
2766  for (TInt i = 0; i < NumRelevantCols; i++) {
2767  switch (ColTypes[i]) {
2768  case atInt:
2769  Predicate.SetIntVal(RelevantCols[i], RowI.GetNextIntAttr(ColIndices[i]));
2770  break;
2771  case atFlt:
2772  Predicate.SetFltVal(RelevantCols[i], RowI.GetNextFltAttr(ColIndices[i]));
2773  break;
2774  case atStr:
2775  Predicate.SetStrVal(RelevantCols[i], RowI.GetNextStrAttr(ColIndices[i]));
2776  break;
2777  }
2778  }
2779  if (!Predicate.Eval()) {
2780  RowI.RemoveNext();
2781  } else {
2782  RowI++;
2783  }
2784  }
2785  } else {
2786  for (TRowIterator RowI = BegRI(); RowI < EndRI(); RowI++) {
2787  for (TInt i = 0; i < NumRelevantCols; i++) {
2788  switch (ColTypes[i]) {
2789  case atInt:
2790  Predicate.SetIntVal(RelevantCols[i], RowI.GetIntAttr(RelevantCols[i]));
2791  break;
2792  case atFlt:
2793  Predicate.SetFltVal(RelevantCols[i], RowI.GetFltAttr(RelevantCols[i]));
2794  break;
2795  case atStr:
2796  Predicate.SetStrVal(RelevantCols[i], RowI.GetStrAttr(RelevantCols[i]));
2797  break;
2798  }
2799  }
2800  if (Predicate.Eval()) { SelectedRows.Add(RowI.GetRowIdx()); }
2801  }
2802  }
2803 }
2804 
2805 void TTable::Classify(TPredicate& Predicate, const TStr& LabelName, const TInt& PositiveLabel, const TInt& NegativeLabel) {
2806  TIntV SelectedRows;
2807  Select(Predicate, SelectedRows, false);
2808  ClassifyAux(SelectedRows, LabelName, PositiveLabel, NegativeLabel);
2809 }
2810 
2811 
2812 // Further optimization: both comparison operation and type of columns don't change between rows..
2813 void TTable::SelectAtomic(const TStr& Col1, const TStr& Col2, TPredComp Cmp, TIntV& SelectedRows, TBool Remove) {
2814  const TAttrType Ty1 = GetColType(Col1);
2815  const TAttrType Ty2 = GetColType(Col2);
2816  const TInt ColIdx1 = GetColIdx(Col1);
2817  const TInt ColIdx2 = GetColIdx(Col2);
2818  if (Ty1 != Ty2) {
2819  TExcept::Throw("SelectAtomic: diff types");
2820  }
2821  if (Cmp == SUBSTR || Cmp == SUPERSTR) { Assert(Ty1 == atStr); }
2822 
2823  if (Remove) {
2825  while (RowI.GetNextRowIdx() != Last) {
2826 
2827  TBool Result;
2828  switch (Ty1) {
2829  case atInt:
2830  Result = TPredicate::EvalAtom(RowI.GetNextIntAttr(ColIdx1), RowI.GetNextIntAttr(ColIdx2), Cmp);
2831  break;
2832  case atFlt:
2833  Result = TPredicate::EvalAtom(RowI.GetNextFltAttr(ColIdx1), RowI.GetNextFltAttr(ColIdx2), Cmp);
2834  break;
2835  case atStr:
2836  Result = TPredicate::EvalStrAtom(RowI.GetNextStrAttr(ColIdx1), RowI.GetNextStrAttr(ColIdx2), Cmp);
2837  break;
2838  }
2839 
2840  if (!Result) {
2841  RowI.RemoveNext();
2842  } else {
2843  RowI++;
2844  }
2845 
2846  }
2847  } else {
2848  for (TRowIterator RowI = BegRI(); RowI < EndRI(); RowI++) {
2849  TBool Result;
2850  switch (Ty1) {
2851  case atInt:
2852  Result = TPredicate::EvalAtom(RowI.GetIntAttr(Col1), RowI.GetIntAttr(Col2), Cmp);
2853  break;
2854  case atFlt:
2855  Result = TPredicate::EvalAtom(RowI.GetFltAttr(Col1), RowI.GetFltAttr(Col2), Cmp);
2856  break;
2857  case atStr:
2858  Result = TPredicate::EvalStrAtom(RowI.GetStrAttr(Col1), RowI.GetStrAttr(Col2), Cmp);
2859  break;
2860  }
2861  if (Result) { SelectedRows.Add(RowI.GetRowIdx()); }
2862  }
2863  }
2864 }
2865 
2866 void TTable::ClassifyAtomic(const TStr& Col1, const TStr& Col2, TPredComp Cmp,
2867  const TStr& LabelName, const TInt& PositiveLabel, const TInt& NegativeLabel) {
2868  TIntV SelectedRows;
2869  SelectAtomic(Col1, Col2, Cmp, SelectedRows, false);
2870  ClassifyAux(SelectedRows, LabelName, PositiveLabel, NegativeLabel);
2871 }
2872 
2874  TIntV& SelectedRows, PTable& SelectedTable, TBool Remove, TBool Table) {
2875  //double startFn = omp_get_wtime();
2876  TStr ValTStr(Val.GetStr());
2877  TAttrType Type = GetColType(Col);
2878  TInt ColIdx = GetColIdx(Col);
2879 
2880  if (Type != Val.GetType()) {
2881  TExcept::Throw("SelectAtomicConst: coltype does not match const type");
2882  }
2883 
2884  if(Remove){
2885 #ifdef USE_OPENMP
2886  if (GetMP()) {
2887  //double endInit = omp_get_wtime();
2888  //printf("Init time = %f\n", endInit-startFn);
2889  TIntPrV Partitions;
2890  GetPartitionRanges(Partitions, omp_get_max_threads()*CHUNKS_PER_THREAD);
2891  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
2892  int RemoveCount = 0;
2893  //double endPart = omp_get_wtime();
2894  //printf("Partition time = %f\n", endPart-endInit);
2895 
2896  TIntPrV Bounds(Partitions.Len());
2897 
2898  // #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD) reduction(+:RemoveCount) shared(Val)
2899  #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD) reduction(+:RemoveCount)
2900  for (int i = 0; i < Partitions.Len(); i++){
2901  //TPrimitive ThreadLocalVal(Val);
2902  TRowIterator RowI(Partitions[i].GetVal1(), this);
2903  TRowIterator EndI(Partitions[i].GetVal2(), this);
2904  TInt FirstRowIdx = TTable::Invalid;
2905  TInt LastRowIdx = TTable::Invalid;
2906  TBool First = true;
2907  while (RowI < EndI) {
2908  TInt CurrRowIdx = RowI.GetRowIdx();
2909  TBool Result;
2910  if (Type != atStr) {
2911  Result = RowI.CompareAtomicConst(ColIdx, Val, Cmp);
2912  } else {
2913  Result = RowI.CompareAtomicConstTStr(ColIdx, ValTStr, Cmp);
2914  }
2915  RowI++;
2916  if(!Result) {
2917  Next[CurrRowIdx] = TTable::Invalid;
2918  RemoveCount++;
2919  } else {
2920  if (First) { FirstRowIdx = CurrRowIdx; First = false; }
2921  else { Next[LastRowIdx] = CurrRowIdx; }
2922  LastRowIdx = CurrRowIdx;
2923  }
2924  }
2925  Bounds[i] = TIntPr(FirstRowIdx, LastRowIdx);
2926  //printf("Thread %d: i = %d, start = %d, end = %d\n", omp_get_thread_num(), i,
2927  // Partitions[i].GetVal1().Val, Partitions[i].GetVal2().Val);
2928  }
2929  //double endIter = omp_get_wtime();
2930  //printf("Iter time = %f\n", endIter-endPart);
2931 
2932  // repair the next vector
2933  TInt CurrBound = 0;
2934  while (CurrBound < Bounds.Len() && Bounds[CurrBound].Val1 == TTable::Invalid) {
2935  CurrBound++;
2936  }
2937  if (CurrBound == Bounds.Len()) {
2938  // selected table is empty
2939  Assert(NumValidRows == RemoveCount);
2940  NumValidRows = 0;
2943  } else {
2944  NumValidRows -= RemoveCount;
2945  FirstValidRow = Bounds[CurrBound].Val1;
2946  LastValidRow = Bounds[CurrBound].Val2;
2947  TInt PrevBound = CurrBound;
2948  CurrBound++;
2949  while (CurrBound < Bounds.Len()) {
2950  if (Bounds[CurrBound].Val1 == TTable::Invalid) { CurrBound++; continue; }
2951  Next[Bounds[PrevBound].Val2] = Bounds[CurrBound].Val1;
2952  LastValidRow = Bounds[CurrBound].Val2;
2953  PrevBound = CurrBound;
2954  CurrBound++;
2955  }
2956  Next[Bounds[PrevBound].Val2] = TTable::Last;
2957  }
2958  IsNextDirty = 1;
2959  //double endRepair = omp_get_wtime();
2960  //printf("Repair time = %f\n", endRepair-endIter);
2961  } else {
2962 #endif
2964  while(RowI.GetNextRowIdx() != Last){
2965  if (!RowI.CompareAtomicConst(ColIdx, Val, Cmp)) {
2966  RowI.RemoveNext();
2967  } else {
2968  RowI++;
2969  }
2970  }
2971  IsNextDirty = 1;
2972 #ifdef USE_OPENMP
2973  }
2974 #endif
2975  } else if (Table) {
2976 #ifdef USE_OPENMP
2977  if (GetMP()) {
2978  //double endInit = omp_get_wtime();
2979  //printf("Init time = %f\n", endInit-startFn);
2980  TIntPrV Partitions;
2981  GetPartitionRanges(Partitions, omp_get_max_threads()*CHUNKS_PER_THREAD);
2982  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
2983  //double endPart = omp_get_wtime();
2984  //printf("Partition time = %f\n", endPart-endInit);
2985 
2986  int TotalSelectedRows = 0;
2987  #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD) reduction(+:TotalSelectedRows)
2988  for (int i = 0; i < Partitions.Len(); i++){
2989  TRowIterator RowI(Partitions[i].GetVal1(), this);
2990  TRowIterator EndI(Partitions[i].GetVal2(), this);
2991  while (RowI < EndI) {
2992  if (Type != atStr) {
2993  if (RowI.CompareAtomicConst(ColIdx, Val, Cmp)) {
2994  TotalSelectedRows++;
2995  }
2996  } else {
2997  if (RowI.CompareAtomicConstTStr(ColIdx, ValTStr, Cmp)) {
2998  TotalSelectedRows++;
2999  }
3000  }
3001  RowI++;
3002  }
3003  }
3004  //double endCount = omp_get_wtime();
3005  //printf("Count time = %f\n", endCount-endPart);
3006 
3007  SelectedTable->ResizeTable(TotalSelectedRows);
3008  //double endResize = omp_get_wtime();
3009  //printf("Resize time = %f\n", endResize-endCount);
3010 
3011  if (TotalSelectedRows == 0) {
3012  // printf("Select: Empty output!\n");
3013  return;
3014  }
3015 
3016  #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD)
3017  for (int i = 0; i < Partitions.Len(); i++){
3018  TIntV LocalSelectedRows;
3019  LocalSelectedRows.Reserve(PartitionSize);
3020  TRowIterator RowI(Partitions[i].GetVal1(), this);
3021  TRowIterator EndI(Partitions[i].GetVal2(), this);
3022  while (RowI < EndI) {
3023  if (Type != atStr) {
3024  if (RowI.CompareAtomicConst(ColIdx, Val, Cmp)) {
3025  LocalSelectedRows.Add(RowI.GetRowIdx());
3026  }
3027  } else {
3028  if (RowI.CompareAtomicConstTStr(ColIdx, ValTStr, Cmp)) {
3029  LocalSelectedRows.Add(RowI.GetRowIdx());
3030  }
3031  }
3032  RowI++;
3033  }
3034  SelectedTable->AddSelectedRows(*this, LocalSelectedRows);
3035  //printf("Thread %d: i = %d, start = %d, end = %d\n", omp_get_thread_num(), i,
3036  // Partitions[i].GetVal1().Val, Partitions[i].GetVal2().Val);
3037  }
3038  //double endIter = omp_get_wtime();
3039  //printf("Iter time = %f\n", endIter-endResize);
3040 
3041  //SelectedTable->ResizeTable(SelectedTable->GetNumValidRows());
3042  //double endResize2 = omp_get_wtime();
3043  //printf("Resize2 time = %f\n", endResize2-endIter);
3044  SelectedTable->SetFirstValidRow();
3045  } else {
3046 #endif
3047  for(TRowIterator RowI = BegRI(); RowI < EndRI(); RowI++){
3048  if (RowI.CompareAtomicConst(ColIdx, Val, Cmp)) {
3049  SelectedTable->AddRowI(RowI);
3050  }
3051  }
3052 #ifdef USE_OPENMP
3053  }
3054 #endif
3055  } else {
3056  for(TRowIterator RowI = BegRI(); RowI < EndRI(); RowI++){
3057  if (RowI.CompareAtomicConst(ColIdx, Val, Cmp)) {
3058  SelectedRows.Add(RowI.GetRowIdx());
3059  }
3060  }
3061  }
3062 }
3063 
3064 inline TInt TTable::CompareRows(TInt R1, TInt R2, const TAttrType& CompareByType, const TInt& CompareByIndex, TBool Asc) {
3065  //printf("comparing rows %d %d by %s\n", R1.Val, R2.Val, CompareBy.CStr());
3066  switch (CompareByType) {
3067  case atInt:{
3068  if (IntCols[CompareByIndex][R1] > IntCols[CompareByIndex][R2]) { return (Asc ? 1 : -1); }
3069  if (IntCols[CompareByIndex][R1] < IntCols[CompareByIndex][R2]) { return (Asc ? -1 : 1); }
3070  return 0;
3071  }
3072  case atFlt:{
3073  if (FltCols[CompareByIndex][R1] > FltCols[CompareByIndex][R2]) { return (Asc ? 1 : -1); }
3074  if (FltCols[CompareByIndex][R1] < FltCols[CompareByIndex][R2]) { return (Asc ? -1 : 1); }
3075  return 0;
3076  }
3077  case atStr:{
3078  TStr S1 = GetStrValIdx(CompareByIndex, R1);
3079  TStr S2 = GetStrValIdx(CompareByIndex, R2);
3080  int CmpRes = strcmp(S1.CStr(), S2.CStr());
3081  return (Asc ? CmpRes : -CmpRes);
3082  }
3083  }
3084  // code should not come here, added to remove a compiler warning
3085  return 0;
3086 }
3087 
3088 inline TInt TTable::CompareRows(TInt R1, TInt R2, const TVec<TAttrType>& CompareByTypes, const TIntV& CompareByIndices, TBool Asc) {
3089  for (TInt i = 0; i < CompareByTypes.Len(); i++) {
3090  TInt res = CompareRows(R1, R2, CompareByTypes[i], CompareByIndices[i], Asc);
3091  if (res != 0) { return res; }
3092  }
3093  return 0;
3094 }
3095 
3096 void TTable::ISort(TIntV& V, TInt StartIdx, TInt EndIdx, const TVec<TAttrType>& SortByTypes, const TIntV& SortByIndices, TBool Asc) {
3097  if (StartIdx < EndIdx) {
3098  for (TInt i = StartIdx+1; i <= EndIdx; i++) {
3099  TInt Val = V[i];
3100  TInt j = i;
3101  while ((StartIdx < j) && (CompareRows(V[j-1], Val, SortByTypes, SortByIndices, Asc) > 0)) {
3102  V[j] = V[j-1];
3103  j--;
3104  }
3105  V[j] = Val;
3106  }
3107  }
3108 }
3109 
3110 TInt TTable::GetPivot(TIntV& V, TInt StartIdx, TInt EndIdx, const TVec<TAttrType>& SortByTypes, const TIntV& SortByIndices, TBool Asc) {
3111  TInt L = EndIdx - StartIdx + 1;
3112  const TInt Idx1 = StartIdx + TInt::GetRnd(L);
3113  const TInt Idx2 = StartIdx + TInt::GetRnd(L);
3114  const TInt Idx3 = StartIdx + TInt::GetRnd(L);
3115  if (CompareRows(V[Idx1], V[Idx2], SortByTypes, SortByIndices, Asc) < 0) {
3116  if (CompareRows(V[Idx2], V[Idx3], SortByTypes, SortByIndices, Asc) < 0) { return Idx2; }
3117  if (CompareRows(V[Idx1], V[Idx3], SortByTypes, SortByIndices, Asc) < 0) { return Idx3; }
3118  return Idx1;
3119  } else {
3120  if (CompareRows(V[Idx3], V[Idx2], SortByTypes, SortByIndices, Asc) < 0) { return Idx2; }
3121  if (CompareRows(V[Idx3], V[Idx1], SortByTypes, SortByIndices, Asc) < 0) { return Idx3; }
3122  return Idx1;
3123  }
3124 }
3125 
3126 TInt TTable::Partition(TIntV& V, TInt StartIdx, TInt EndIdx, const TVec<TAttrType>& SortByTypes, const TIntV& SortByIndices, TBool Asc) {
3127 
3128  // test if the elements are already sorted
3129  TInt j;
3130  for (j = StartIdx; j < EndIdx; j++) {
3131  if (CompareRows(V[j], V[j+1], SortByTypes, SortByIndices, Asc) > 0) {
3132  break;
3133  }
3134  }
3135  if (j >= EndIdx) {
3136  return EndIdx+1;
3137  }
3138 
3139  TInt PivotIdx = GetPivot(V, StartIdx, EndIdx, SortByTypes, SortByIndices, Asc);
3140  TInt Pivot = V[PivotIdx];
3141  V.Swap(PivotIdx, EndIdx);
3142  TInt StoreIdx = StartIdx;
3143  for (TInt i = StartIdx; i < EndIdx; i++) {
3144  if (CompareRows(V[i], Pivot, SortByTypes, SortByIndices, Asc) <= 0) {
3145  V.Swap(i, StoreIdx);
3146  StoreIdx++;
3147  }
3148  }
3149  // move pivot value to its place
3150  V.Swap(StoreIdx, EndIdx);
3151  return StoreIdx;
3152 }
3153 
3154 void TTable::QSort(TIntV& V, TInt StartIdx, TInt EndIdx, const TVec<TAttrType>& SortByTypes, const TIntV& SortByIndices, TBool Asc) {
3155  if (StartIdx < EndIdx) {
3156  if (EndIdx - StartIdx < 20) {
3157  ISort(V, StartIdx, EndIdx, SortByTypes, SortByIndices, Asc);
3158  } else {
3159  TInt Pivot = Partition(V, StartIdx, EndIdx, SortByTypes, SortByIndices, Asc);
3160  if (Pivot > EndIdx) {
3161  return;
3162  }
3163  // Everything <= Pivot will be in StartIdx, Pivot-1. Shrink this
3164  // range to ignore elements equal to the pivot in the first
3165  // recursive call, to optimize for the case when a lot of
3166  // rows are equal.
3167  int Ub = Pivot - 1;
3168  while (Ub >= StartIdx && CompareRows(
3169  V[Ub], V[Pivot], SortByTypes, SortByIndices, Asc) == 0) {
3170  Ub -= 1;
3171  }
3172  QSort(V, StartIdx, Ub, SortByTypes, SortByIndices, Asc);
3173  QSort(V, Pivot+1, EndIdx, SortByTypes, SortByIndices, Asc);
3174  }
3175  }
3176 }
3177 
3178 void TTable::Merge(TIntV& V, TInt Idx1, TInt Idx2, TInt Idx3, const TVec<TAttrType>& SortByTypes, const TIntV& SortByIndices, TBool Asc) {
3179  TInt i = Idx1, j = Idx2;
3180  TIntV SortedV;
3181  while (i < Idx2 && j < Idx3) {
3182  if (CompareRows(V[i], V[j], SortByTypes, SortByIndices, Asc) <= 0) {
3183  SortedV.Add(V[i]);
3184  i++;
3185  }
3186  else {
3187  SortedV.Add(V[j]);
3188  j++;
3189  }
3190  }
3191  while (i < Idx2) {
3192  SortedV.Add(V[i]);
3193  i++;
3194  }
3195  while (j < Idx3) {
3196  SortedV.Add(V[j]);
3197  j++;
3198  }
3199 
3200  for (TInt sz = 0; sz < Idx3 - Idx1; sz++) {
3201  V[Idx1 + sz] = SortedV[sz];
3202  }
3203 }
3204 
3205 #ifdef USE_OPENMP
3206 void TTable::QSortPar(TIntV& V, const TVec<TAttrType>& SortByTypes, const TIntV& SortByIndices, TBool Asc) {
3207  TInt NumThreads = 8; // Setting this to 8 because that results in the fastest sorting on Madmax.
3208  TInt Sz = V.Len();
3209  TIntV IndV, NextV;
3210  for (TInt i = 0; i < NumThreads; i++) {
3211  IndV.Add(i * (Sz / NumThreads));
3212  }
3213  IndV.Add(Sz);
3214 
3215  omp_set_num_threads(NumThreads);
3216  #pragma omp parallel for
3217  for (int i = 0; i < NumThreads; i++) {
3218  QSort(V, IndV[i], IndV[i+1] - 1, SortByTypes, SortByIndices, Asc);
3219  }
3220 
3221  while (NumThreads > 1) {
3222  omp_set_num_threads(NumThreads / 2);
3223  #pragma omp parallel for
3224  for (int i = 0; i < NumThreads; i += 2) {
3225  Merge(V, IndV[i], IndV[i+1], IndV[i+2], SortByTypes, SortByIndices, Asc);
3226  }
3227 
3228  NextV.Clr();
3229  for (TInt i = 0; i < NumThreads; i+=2) {
3230  NextV.Add(IndV[i]);
3231  }
3232  NextV.Add(Sz);
3233  IndV = NextV;
3234 
3235  NumThreads = NumThreads / 2;
3236  }
3237 }
3238 #endif // USE_OPENMP
3239 
3240 void TTable::Order(const TStrV& OrderBy, TStr OrderColName, TBool ResetRankByMSC, TBool Asc) {
3241  // get a vector of all valid row indices
3242  TIntV ValidRows = TIntV(NumValidRows);
3243  if (NumRows == NumValidRows) {
3244  for (TInt i = 0; i < NumValidRows; i++) {
3245  ValidRows[i] = i;
3246  }
3247  } else {
3248  TInt i = 0;
3249  for (TRowIterator RI = BegRI(); RI < EndRI(); RI++) {
3250  ValidRows[i] = RI.GetRowIdx();
3251  i++;
3252  }
3253  }
3254  TVec<TAttrType> OrderByTypes(OrderBy.Len());
3255  TIntV OrderByIndices(OrderBy.Len());
3256  for (TInt i = 0; i < OrderBy.Len(); i++) {
3257  OrderByTypes[i] = GetColType(OrderBy[i]);
3258  OrderByIndices[i] = GetColIdx(OrderBy[i]);
3259  }
3260 
3261  // sort that vector according to the attributes given in "OrderBy" in lexicographic order
3262 #ifdef USE_OPENMP
3263  if (GetMP()) {
3264  QSortPar(ValidRows, OrderByTypes, OrderByIndices, Asc);
3265  } else {
3266 #endif
3267  QSort(ValidRows, 0, NumValidRows-1, OrderByTypes, OrderByIndices, Asc);
3268 #ifdef USE_OPENMP
3269  }
3270 #endif
3271 
3272  // rewire Next vector
3273  IsNextDirty = 1;
3274  if (NumValidRows > 0) {
3275  FirstValidRow = ValidRows[0];
3276  } else {
3277  FirstValidRow = Last;
3278  }
3279  for (TInt i = 0; i < NumValidRows-1; i++) {
3280  Next[ValidRows[i]] = ValidRows[i+1];
3281  }
3282  if (NumValidRows > 0) {
3283  Next[ValidRows[NumValidRows-1]] = Last;
3284  LastValidRow = ValidRows[NumValidRows-1];
3285  } else {
3286  LastValidRow = Last;
3287  }
3288 
3289  // add rank column
3290  if (!OrderColName.Empty()) {
3291  TIntV RankCol = TIntV(NumRows);
3292  for (TInt i = 0; i < NumValidRows; i++) {
3293  RankCol[ValidRows[i]] = i;
3294  }
3295  if (ResetRankByMSC) {
3296  for (TInt i = 1; i < NumValidRows; i++) {
3297  TStr GroupName = OrderBy[0];
3298  if (GetStrVal(GroupName, ValidRows[i]) != GetStrVal(GroupName, ValidRows[i-1])) {
3299  RankCol[ValidRows[i]] = 0;
3300  } else {
3301  RankCol[ValidRows[i]] = RankCol[ValidRows[i-1]] + 1;
3302  }
3303  }
3304  }
3305  IntCols.Add(RankCol);
3306  AddSchemaCol(OrderColName, atInt);
3307  AddColType(OrderColName, atInt, IntCols.Len()-1);
3308  }
3309 }
3310 
3312  TInt FreeIndex = 0;
3313  TIntV Mapping; // Mapping[old_index] = new_index/invalid
3314 
3315  TInt IdColIdx = GetColIdx(IdColName);
3316 
3317  for (TInt i = 0; i < Next.Len(); i++) {
3318  if (Next[i] != TTable::Invalid) {
3319  // "first row" properly set beforehand
3320  if (FreeIndex == 0) {
3321  Assert (i == FirstValidRow);
3322  FirstValidRow = 0;
3323  }
3324 
3325  if (Next[i] != Last) {
3326  Next[FreeIndex] = FreeIndex + 1;
3327  Mapping.Add(FreeIndex);
3328  } else {
3329  Next[FreeIndex] = Last;
3330  LastValidRow = FreeIndex;
3331  Mapping.Add(Last);
3332  }
3333 
3334  RowIdMap.AddDat(IntCols[IdColIdx][i], FreeIndex);
3335 
3336  for (TInt j = 0; j < IntCols.Len(); j++) {
3337  IntCols[j][FreeIndex] = IntCols[j][i];
3338  }
3339  for (TInt j = 0; j < FltCols.Len(); j++) {
3340  FltCols[j][FreeIndex] = FltCols[j][i];
3341  }
3342  for (TInt j = 0; j < StrColMaps.Len(); j++) {
3343  StrColMaps[j][FreeIndex] = StrColMaps[j][i];
3344  }
3345 
3346  FreeIndex++;
3347  } else {
3348  NumRows--;
3349  Mapping.Add(TTable::Invalid);
3350  }
3351  }
3352 
3353  // should match, or bug somewhere
3355 }
3356 
3358  if (N == 0) {
3359  LastValidRow = -1;
3360  return;
3361  }
3362  TRowIterator RowI = BegRI();
3363  TInt count = 1;
3364  while (count < N) {
3365  if (!(RowI < EndRI())) {
3366  return; // The table contains less than N rows
3367  }
3368  RowI++;
3369  count++;
3370  }
3371  NumValidRows = N;
3372  TInt LastId = RowI.GetRowIdx();
3373  if (Next[LastId] == Last) {
3374  return; // The table contains exactly N rows
3375  }
3376  // The table contains more than N rows
3377  TInt CurrId = LastId;
3378  while (Next[CurrId] != Last) {
3379  Assert(Next[CurrId] != Invalid);
3380  TInt NextId = Next[CurrId];
3381  Next[CurrId] = Invalid;
3382  CurrId = NextId;
3383  }
3384  Next[LastId] = Last;
3385  LastValidRow = LastId;
3386 }
3387 
3388 inline void TTable::CheckAndAddIntNode(PNEANet Graph, THashSet<TInt>& NodeVals, TInt NodeId) {
3389  if (!NodeVals.IsKey(NodeId)) {
3390  Graph->AddNode(NodeId);
3391  NodeVals.AddKey(NodeId);
3392  }
3393 }
3394 
3395 inline void TTable::AddEdgeAttributes(PNEANet& Graph, int RowId) {
3396  for (TInt i = 0; i < EdgeAttrV.Len(); i++) {
3397  TStr ColName = EdgeAttrV[i];
3398  TAttrType T = GetColType(ColName);
3399  TInt Index = GetColIdx(ColName);
3400  switch (T) {
3401  case atInt:
3402  Graph->AddIntAttrDatE(RowId, IntCols[Index][RowId], ColName);
3403  break;
3404  case atFlt:
3405  Graph->AddFltAttrDatE(RowId, FltCols[Index][RowId], ColName);
3406  break;
3407  case atStr:
3408  Graph->AddStrAttrDatE(RowId, GetStrValIdx(Index, RowId), ColName);
3409  break;
3410  }
3411  }
3412 }
3413 
3414 inline void TTable::AddNodeAttributes(TInt NId, TStrV NodeAttrV, TInt RowId, THash<TInt, TStrIntVH>& NodeIntAttrs,
3415  THash<TInt, TStrFltVH>& NodeFltAttrs, THash<TInt, TStrStrVH>& NodeStrAttrs) {
3416  for (TInt i = 0; i < NodeAttrV.Len(); i++) {
3417  TStr ColAttr = NodeAttrV[i];
3418  TAttrType CT = GetColType(ColAttr);
3419  int ColId = GetColIdx(ColAttr);
3420  // check if this is a common src-dst attribute
3421  for (TInt i = 0; i < CommonNodeAttrs.Len(); i++) {
3422  if (CommonNodeAttrs[i].Val1 == ColAttr || CommonNodeAttrs[i].Val2 == ColAttr) {
3423  ColAttr = CommonNodeAttrs[i].Val3;
3424  break;
3425  }
3426  }
3427  if (CT == atInt) {
3428  if (!NodeIntAttrs.IsKey(NId)) { NodeIntAttrs.AddKey(NId); }
3429  if (!NodeIntAttrs.GetDat(NId).IsKey(ColAttr)) { NodeIntAttrs.GetDat(NId).AddKey(ColAttr); }
3430  NodeIntAttrs.GetDat(NId).GetDat(ColAttr).Add(IntCols[ColId][RowId]);
3431  } else if (CT == atFlt) {
3432  if (!NodeFltAttrs.IsKey(NId)) { NodeFltAttrs.AddKey(NId); }
3433  if (!NodeFltAttrs.GetDat(NId).IsKey(ColAttr)) { NodeFltAttrs.GetDat(NId).AddKey(ColAttr); }
3434  NodeFltAttrs.GetDat(NId).GetDat(ColAttr).Add(FltCols[ColId][RowId]);
3435  } else {
3436  if (!NodeStrAttrs.IsKey(NId)) { NodeStrAttrs.AddKey(NId); }
3437  if (!NodeStrAttrs.GetDat(NId).IsKey(ColAttr)) { NodeStrAttrs.GetDat(NId).AddKey(ColAttr); }
3438  NodeStrAttrs.GetDat(NId).GetDat(ColAttr).Add(GetStrValIdx(ColId, RowId));
3439  }
3440  }
3441 }
3442 
3443 // Makes one pass over all the rows in the vector RowIds, and builds
3444 // a PNEANet, with each row as an edge between SrcCol and DstCol.
3445 PNEANet TTable::BuildGraph(const TIntV& RowIds, TAttrAggr AggrPolicy) {
3446  PNEANet Graph = TNEANet::New();
3447 
3448  const TAttrType NodeType = GetColType(SrcCol);
3449  Assert(NodeType == GetColType(DstCol));
3450  const TInt SrcColIdx = GetColIdx(SrcCol);
3451  const TInt DstColIdx = GetColIdx(DstCol);
3452 
3453  // node values - i.e. the unique values of src/dst col
3454  //THashSet<TInt> IntNodeVals; // for both int and string node attr types.
3455  THash<TFlt, TInt> FltNodeVals;
3456 
3457  // node attributes
3458  THash<TInt, TStrIntVH> NodeIntAttrs;
3459  THash<TInt, TStrFltVH> NodeFltAttrs;
3460  THash<TInt, TStrStrVH> NodeStrAttrs;
3461 
3462  // make single pass over all rows in given row id set
3463  for (TVec<TInt>::TIter it = RowIds.BegI(); it < RowIds.EndI(); it++) {
3464  TInt CurrRowIdx = *it;
3465 
3466  // add src and dst nodes to graph if they are not seen earlier
3467  TInt SVal, DVal;
3468  if (NodeType == atFlt) {
3469  TFlt FSVal = FltCols[SrcColIdx][CurrRowIdx];
3470  SVal = CheckAndAddFltNode(Graph, FltNodeVals, FSVal);
3471  TFlt FDVal = FltCols[SrcColIdx][CurrRowIdx];
3472  DVal = CheckAndAddFltNode(Graph, FltNodeVals, FDVal);
3473  } else if (NodeType == atInt || NodeType == atStr) {
3474  if (NodeType == atInt) {
3475  SVal = IntCols[SrcColIdx][CurrRowIdx];
3476  DVal = IntCols[DstColIdx][CurrRowIdx];
3477  } else {
3478  SVal = StrColMaps[SrcColIdx][CurrRowIdx];
3479  if (strlen(Context->StringVals.GetKey(SVal)) == 0) { continue; } //illegal value
3480  DVal = StrColMaps[DstColIdx][CurrRowIdx];
3481  if (strlen(Context->StringVals.GetKey(DVal)) == 0) { continue; } //illegal value
3482  }
3483  if (!Graph->IsNode(SVal)) { Graph->AddNode(SVal); }
3484  if (!Graph->IsNode(DVal)) { Graph->AddNode(DVal); }
3485  //CheckAndAddIntNode(Graph, IntNodeVals, SVal);
3486  //CheckAndAddIntNode(Graph, IntNodeVals, DVal);
3487  }
3488 
3489  // add edge and edge attributes
3490  Graph->AddEdge(SVal, DVal, CurrRowIdx);
3491  if (EdgeAttrV.Len() > 0) { AddEdgeAttributes(Graph, CurrRowIdx); }
3492 
3493  // get src and dst node attributes into hashmaps
3494  if (SrcNodeAttrV.Len() > 0) {
3495  AddNodeAttributes(SVal, SrcNodeAttrV, CurrRowIdx, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
3496  }
3497  if (DstNodeAttrV.Len() > 0) {
3498  AddNodeAttributes(DVal, DstNodeAttrV, CurrRowIdx, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
3499  }
3500  }
3501 
3502  // aggregate node attributes and add to graph
3503  if (SrcNodeAttrV.Len() > 0 || DstNodeAttrV.Len() > 0) {
3504  for (TNEANet::TNodeI NodeI = Graph->BegNI(); NodeI < Graph->EndNI(); NodeI++) {
3505  TInt NId = NodeI.GetId();
3506  if (NodeIntAttrs.IsKey(NId)) {
3507  TStrIntVH IntAttrVals = NodeIntAttrs.GetDat(NId);
3508  for (TStrIntVH::TIter it = IntAttrVals.BegI(); it < IntAttrVals.EndI(); it++) {
3509  TInt AttrVal = AggregateVector<TInt>(it.GetDat(), AggrPolicy);
3510  Graph->AddIntAttrDatN(NId, AttrVal, it.GetKey());
3511  }
3512  }
3513  if (NodeFltAttrs.IsKey(NId)) {
3514  TStrFltVH FltAttrVals = NodeFltAttrs.GetDat(NId);
3515  for (TStrFltVH::TIter it = FltAttrVals.BegI(); it < FltAttrVals.EndI(); it++) {
3516  TFlt AttrVal = AggregateVector<TFlt>(it.GetDat(), AggrPolicy);
3517  Graph->AddFltAttrDatN(NId, AttrVal, it.GetKey());
3518  }
3519  }
3520  if (NodeStrAttrs.IsKey(NId)) {
3521  TStrStrVH StrAttrVals = NodeStrAttrs.GetDat(NId);
3522  for (TStrStrVH::TIter it = StrAttrVals.BegI(); it < StrAttrVals.EndI(); it++) {
3523  TStr AttrVal = AggregateVector<TStr>(it.GetDat(), AggrPolicy);
3524  Graph->AddStrAttrDatN(NId, AttrVal, it.GetKey());
3525  }
3526  }
3527  }
3528  }
3529 
3530  return Graph;
3531 }
3532 
3533 
3534 
3535 void TTable::InitRowIdBuckets(int NumBuckets) {
3536  for (TInt i = 0; i < RowIdBuckets.Len(); i++) {
3537  RowIdBuckets[i].Clr();
3538  }
3539  RowIdBuckets.Clr();
3540 
3541  RowIdBuckets.Gen(NumBuckets);
3542  for (TInt i = 0; i < NumBuckets; i++) {
3543  RowIdBuckets[i].Gen(10, 0);
3544  }
3545 }
3546 
3547 void TTable::FillBucketsByWindow(TStr SplitAttr, TInt JumpSize, TInt WindowSize, TInt StartVal, TInt EndVal) {
3548  Assert (JumpSize <= WindowSize);
3549  int NumBuckets, MinBucket, MaxBucket;
3550  TInt SplitColId = GetColIdx(SplitAttr);
3551 
3552  if (StartVal == TInt::Mn || EndVal == TInt::Mx) {
3553  // calculate min and max value of the column 'SplitAttr'
3554  TInt MinValue = TInt::Mx;
3555  TInt MaxValue = TInt::Mn;
3556  for (TInt i = 0; i < Next.Len(); i++) {
3557  if (Next[i] != Invalid) {
3558  if (MinValue > IntCols[SplitColId][i]) {
3559  MinValue = IntCols[SplitColId][i];
3560  }
3561  if (MaxValue < IntCols[SplitColId][i]) {
3562  MaxValue = IntCols[SplitColId][i];
3563  }
3564  }
3565  }
3566 
3567  if (StartVal == TInt::Mn) StartVal = MinValue;
3568  if (EndVal == TInt::Mx) EndVal = MaxValue;
3569  }
3570 
3571  // initialize buckets
3572  NumBuckets = 1;
3573  if (JumpSize > 0) {
3574  NumBuckets = (EndVal - StartVal)/JumpSize + 1;
3575  }
3576 
3577  InitRowIdBuckets(NumBuckets);
3578 
3579  // populate RowIdSets by computing the range of buckets for each row
3580  for (TInt i = 0; i < Next.Len(); i++) {
3581  if (Next[i] == Invalid) { continue; }
3582  int SplitVal = IntCols[SplitColId][i];
3583  if (SplitVal < StartVal || SplitVal > EndVal) { continue; }
3584  int RowVal = SplitVal - StartVal;
3585  if (JumpSize == 0) { // expanding windows
3586  MinBucket = RowVal/WindowSize;
3587  MaxBucket = NumBuckets-1;
3588  } else if (JumpSize == WindowSize) { // disjoint windows
3589  MinBucket = MaxBucket = RowVal/JumpSize;
3590  } else { // sliding windows
3591  if (RowVal < WindowSize) { MinBucket = 0; }
3592  else { MinBucket = (RowVal-WindowSize)/JumpSize + 1; }
3593  MaxBucket = RowVal/JumpSize;
3594  }
3595  for (TInt j = MinBucket; j <= MaxBucket; j++) { RowIdBuckets[j].Add(i); }
3596  }
3597 }
3598 
3599 void TTable::FillBucketsByInterval(TStr SplitAttr, TIntPrV SplitIntervals) {
3600  TInt SplitColId = GetColIdx(SplitAttr);
3601  int NumBuckets = SplitIntervals.Len();
3602  InitRowIdBuckets(NumBuckets);
3603 
3604  // populate RowIdSets by computing the range of buckets for each row
3605  for (TInt i = 0; i < Next.Len(); i++) {
3606  if (Next[i] == Invalid) { continue; }
3607  int SplitVal = IntCols[SplitColId][i];
3608  for (TInt j = 0; j < SplitIntervals.Len(); j++) {
3609  if (SplitVal >= SplitIntervals[j].Val1 && SplitVal < SplitIntervals[j].Val2) {
3610  RowIdBuckets[j].Add(i);
3611  }
3612  }
3613  }
3614 }
3615 
3617  //call BuildGraph on each row id set - parallelizable!
3618  TVec<PNEANet> GraphSequence;
3619  for (TInt i = 0; i < RowIdBuckets.Len(); i++) {
3620  if (RowIdBuckets[i].Len() == 0) { continue; }
3621  PNEANet PNet = BuildGraph(RowIdBuckets[i], AggrPolicy);
3622  GraphSequence.Add(PNet);
3623  }
3624 
3625  return GraphSequence;
3626 }
3627 
3629  CurrBucket = -1;
3630  this->AggrPolicy = AggrPolicy;
3631  return GetNextGraphFromSequence();
3632 }
3633 
3635  CurrBucket++;
3636  while (CurrBucket < RowIdBuckets.Len() && RowIdBuckets[CurrBucket].Len() == 0) {
3637  CurrBucket++;
3638  }
3639  if (CurrBucket >= RowIdBuckets.Len()) { return NULL; }
3641 }
3642 
3643 // Only integer SplitAttr supported
3644 // Setting JumpSize = WindowSize will give disjoint windows
3645 // Setting JumpSize < WindowSize will give sliding windows
3646 // Setting JumpSize > WindowSize will drop certain rows (currently not supported)
3647 // Setting JumpSize = 0 will give expanding windows (i.e. starting at 0 and ending at i*WindowSize)
3648 // To set the range of values of SplitAttr to be considered, use StartVal and EndVal (inclusive)
3649 // If StartVal == TInt.Mn, then the buckets will start from the min value of SplitAttr in the table.
3650 // If EndVal == TInt.Mx, then the buckets will end at the max value of SplitAttr in the table.
3651 TVec<PNEANet> TTable::ToGraphSequence(TStr SplitAttr, TAttrAggr AggrPolicy, TInt WindowSize, TInt JumpSize, TInt StartVal, TInt EndVal) {
3652  FillBucketsByWindow(SplitAttr, JumpSize, WindowSize, StartVal, EndVal);
3653  printf("buckets filled\n");
3654  return GetGraphsFromSequence(AggrPolicy);
3655 }
3656 
3657 TVec<PNEANet> TTable::ToVarGraphSequence(TStr SplitAttr, TAttrAggr AggrPolicy, TIntPrV SplitIntervals) {
3658  FillBucketsByInterval(SplitAttr, SplitIntervals);
3659  return GetGraphsFromSequence(AggrPolicy);
3660 }
3661 
3663  return ToGraphSequence(GroupAttr, AggrPolicy, TInt(1), TInt(1), TInt::Mn, TInt::Mx);
3664 }
3665 
3666 PNEANet TTable::ToGraphSequenceIterator(TStr SplitAttr, TAttrAggr AggrPolicy, TInt WindowSize, TInt JumpSize, TInt StartVal, TInt EndVal) {
3667  FillBucketsByWindow(SplitAttr, JumpSize, WindowSize, StartVal, EndVal);
3668  return GetFirstGraphFromSequence(AggrPolicy);
3669 }
3670 
3671 PNEANet TTable::ToVarGraphSequenceIterator(TStr SplitAttr, TAttrAggr AggrPolicy, TIntPrV SplitIntervals) {
3672  FillBucketsByInterval(SplitAttr, SplitIntervals);
3673  return GetFirstGraphFromSequence(AggrPolicy);
3674 }
3675 
3677  return ToGraphSequenceIterator(GroupAttr, AggrPolicy, TInt(1), TInt(1), TInt::Mn, TInt::Mx);
3678 }
3679 
3680 // calls to this must be preceded by a call to one of the above ToGraph*Iterator functions
3682  return GetNextGraphFromSequence();
3683 }
3684 
3686  return CurrBucket >= RowIdBuckets.Len() - 1;
3687 }
3688 
3690  Schema SR;
3691  SR.Add(TPair<TStr,TAttrType>("node_id",atInt));
3692 
3693  TStrV IntAttrNames;
3694  TStrV FltAttrNames;
3695  TStrV StrAttrNames;
3696 
3697  TNEANet::TNodeI NodeI = Network->BegNI();
3698  NodeI.GetIntAttrNames(IntAttrNames);
3699  NodeI.GetFltAttrNames(FltAttrNames);
3700  NodeI.GetStrAttrNames(StrAttrNames);
3701  for (TInt i = 0; i < IntAttrNames.Len(); i++) {
3702  SR.Add(TPair<TStr,TAttrType>(IntAttrNames[i],atInt));
3703  }
3704  for (TInt i = 0; i < FltAttrNames.Len(); i++) {
3705  SR.Add(TPair<TStr,TAttrType>(FltAttrNames[i],atFlt));
3706  }
3707  for (TInt i = 0; i < StrAttrNames.Len(); i++) {
3708  SR.Add(TPair<TStr,TAttrType>(StrAttrNames[i],atStr));
3709  }
3710 
3711  PTable T = New(SR, Context);
3712 
3713  TInt Cnt = 0;
3714  // populate table columns
3715  while (NodeI < Network->EndNI()) {
3716  T->IntCols[0].Add(NodeI.GetId());
3717  for (TInt i = 0; i < IntAttrNames.Len(); i++) {
3718  T->IntCols[i+1].Add(Network->GetIntAttrDatN(NodeI,IntAttrNames[i]));
3719  }
3720  for (TInt i = 0; i < FltAttrNames.Len(); i++) {
3721  T->FltCols[i].Add(Network->GetFltAttrDatN(NodeI,FltAttrNames[i]));
3722  }
3723  for (TInt i = 0; i < StrAttrNames.Len(); i++) {
3724  T->AddStrVal(i, Network->GetStrAttrDatN(NodeI,StrAttrNames[i]));
3725  }
3726  Cnt++;
3727  NodeI++;
3728  }
3729  // set number of rows and "Next" vector
3730  T->NumRows = Cnt;
3731  T->NumValidRows = T->NumRows;
3732  T->Next = TIntV(T->NumRows,0);
3733  for (TInt i = 0; i < T->NumRows-1; i++) {
3734  T->Next.Add(i+1);
3735  }
3736  T->LastValidRow = T->NumRows-1;
3737  T->Next.Add(Last);
3738  return T;
3739 }
3740 
3742  Schema SR;
3743  SR.Add(TPair<TStr,TAttrType>("edg_id",atInt));
3744  SR.Add(TPair<TStr,TAttrType>("src_id",atInt));
3745  SR.Add(TPair<TStr,TAttrType>("dst_id",atInt));
3746 
3747  TStrV IntAttrNames;
3748  TStrV FltAttrNames;
3749  TStrV StrAttrNames;
3750 
3751  TNEANet::TEdgeI EdgeI = Network->BegEI();
3752  EdgeI.GetIntAttrNames(IntAttrNames);
3753  EdgeI.GetFltAttrNames(FltAttrNames);
3754  EdgeI.GetStrAttrNames(StrAttrNames);
3755  for (TInt i = 0; i < IntAttrNames.Len(); i++) {
3756  SR.Add(TPair<TStr,TAttrType>(IntAttrNames[i],atInt));
3757  }
3758  for (TInt i = 0; i < FltAttrNames.Len(); i++) {
3759  SR.Add(TPair<TStr,TAttrType>(FltAttrNames[i],atFlt));
3760  }
3761  for (TInt i = 0; i < StrAttrNames.Len(); i++) {
3762  //printf("%s\n",StrAttrNames[i].CStr());
3763  SR.Add(TPair<TStr,TAttrType>(StrAttrNames[i],atStr));
3764  }
3765 
3766  PTable T = New(SR, Context);
3767 
3768  TInt Cnt = 0;
3769  // populate table columns
3770  while (EdgeI < Network->EndEI()) {
3771  T->IntCols[0].Add(EdgeI.GetId());
3772  T->IntCols[1].Add(EdgeI.GetSrcNId());
3773  T->IntCols[2].Add(EdgeI.GetDstNId());
3774  for (TInt i = 0; i < IntAttrNames.Len(); i++) {
3775  T->IntCols[i+3].Add(Network->GetIntAttrDatE(EdgeI,IntAttrNames[i]));
3776  }
3777  for (TInt i = 0; i < FltAttrNames.Len(); i++) {
3778  T->FltCols[i].Add(Network->GetFltAttrDatE(EdgeI,FltAttrNames[i]));
3779  }
3780  for (TInt i = 0; i < StrAttrNames.Len(); i++) {
3781  T->AddStrVal(i, Network->GetStrAttrDatE(EdgeI,StrAttrNames[i]));
3782  }
3783  Cnt++;
3784  EdgeI++;
3785  }
3786  // set number of rows and "Next" vector
3787  T->NumRows = Cnt;
3788  T->NumValidRows = T->NumRows;
3789  T->Next = TIntV(T->NumRows,0);
3790  for (TInt i = 0; i < T->NumRows-1; i++) {
3791  T->Next.Add(i+1);
3792  }
3793  T->LastValidRow = T->NumRows-1;
3794  T->Next.Add(Last);
3795  return T;
3796 }
3797 
3798 #ifdef GCC_ATOMIC
3800  Schema SR;
3801  SR.Add(TPair<TStr,TAttrType>("src_id",atInt));
3802  SR.Add(TPair<TStr,TAttrType>("dst_id",atInt));
3803 
3804  TNGraphMP::TEdgeI FirstEI = Network->BegEI();
3805  PTable T = New(SR, Context);
3806  TInt NumEdges = Network->GetEdges();
3807  TInt NumPartitions = omp_get_max_threads()*CHUNKS_PER_THREAD;
3808  TInt PartitionSize = NumEdges/NumPartitions;
3809  if (PartitionSize*NumPartitions < NumEdges) { NumPartitions++;}
3810 
3812  TVec<TEIPr> Partitions;
3813  TIntV PartitionSizes;
3814  TNGraphMP::TEdgeI currStart = FirstEI;
3815  TInt currCount = 0;
3816  while (FirstEI < Network->EndEI()){
3817  if (currCount == PartitionSize) {
3818  Partitions.Add(TEIPr(currStart, FirstEI));
3819  currStart = FirstEI;
3820  PartitionSizes.Add(currCount);
3821  //printf("added: %d\n", currCount.Val);
3822  currCount = 0;
3823  }
3824  //printf("%d\n", currCount.Val);
3825  FirstEI++;
3826  currCount++;
3827  }
3828  Partitions.Add(TEIPr(currStart, FirstEI));
3829  PartitionSizes.Add(currCount);
3830 
3831  T->ResizeTable(NumEdges);
3832  #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD)
3833  for (int p = 0; p < Partitions.Len(); p++) {
3834  TNGraphMP::TEdgeI EdgeI = Partitions[p].GetVal1();
3835  TNGraphMP::TEdgeI EndI = Partitions[p].GetVal2();
3836  //printf("Thread = %d, p = %d, size = %d\n", omp_get_thread_num(), p, PartitionSizes[p].Val);
3837  int start = T->GetEmptyRowsStart(PartitionSizes[p]);
3838  while (EdgeI < EndI) {
3839  T->IntCols[0][start] = EdgeI.GetSrcNId();
3840  T->IntCols[1][start] = EdgeI.GetDstNId();
3841  EdgeI++;
3842  if (EdgeI < EndI) { T->Next[start] = start+1;}
3843  start++;
3844  }
3845  }
3846 
3847  Assert(T->NumRows == NumEdges);
3848  return T;
3849 }
3850 #endif // GCC_ATOMIC
3851 
3852 PTable TTable::GetFltNodePropertyTable(const PNEANet& Network, const TIntFltH& Property,
3853  const TStr& NodeAttrName, const TAttrType& NodeAttrType, const TStr& PropertyAttrName,
3854  TTableContext* Context) {
3855  Schema SR;
3856  // Determine type of node id
3857  SR.Add(TPair<TStr,TAttrType>(NodeAttrName,NodeAttrType));
3858  SR.Add(TPair<TStr,TAttrType>(PropertyAttrName,atFlt));
3859  PTable T = New(SR, Context);
3860  TInt NodeColIdx = T->GetColIdx(NodeAttrName);
3861  TInt Cnt = 0;
3862  // populate table columns
3863  for (TNEANet::TNodeI NodeI = Network->BegNI(); NodeI < Network->EndNI(); NodeI++) {
3864  switch (NodeAttrType) {
3865  case atInt:
3866  T->IntCols[NodeColIdx].Add(Network->GetIntAttrDatN(NodeI,NodeAttrName));
3867  break;
3868  case atFlt:
3869  T->FltCols[NodeColIdx].Add(Network->GetFltAttrDatN(NodeI,NodeAttrName));
3870  break;
3871  case atStr:
3872  T->AddStrVal(TInt(0), Network->GetStrAttrDatN(NodeI,NodeAttrName));
3873  break;
3874  }
3875  T->FltCols[0].Add(Property.GetDat(NodeI.GetId()));
3876  Cnt++;
3877  }
3878  // set number of rows and "Next" vector
3879  T->NumRows = Cnt;
3880  T->NumValidRows = T->NumRows;
3881  T->Next = TIntV(T->NumRows,0);
3882  for (TInt i = 0; i < T->NumRows-1; i++) {
3883  T->Next.Add(i+1);
3884  }
3885  T->LastValidRow = T->NumRows-1;
3886  T->Next.Add(Last);
3887  return T;
3888 }
3889 
3890 /*** Special Filters ***/
3891 PTable TTable::IsNextK(const TStr& OrderCol, TInt K, const TStr& GroupBy, const TStr& RankColName) {
3892  TStrV OrderBy;
3893  if (GroupBy.Empty()) {
3894  OrderBy.Add(OrderCol);
3895  } else {
3896  OrderBy.Add(GroupBy);
3897  OrderBy.Add(OrderCol);
3898  }
3899  if (RankColName.Empty()) {
3900  Order(OrderBy);
3901  } else {
3902  Order(OrderBy, RankColName, true);
3903  }
3904  TAttrType GroupByAttrType = GetColType(GroupBy);
3905  PTable T = InitializeJointTable(*this);
3906  for (TRowIterator RI = BegRI(); RI < EndRI(); RI++) {
3907  TInt Succ = RI.GetRowIdx();
3908  TBool OutOfGroup = false;
3909  for (TInt i = 0; i < K; i++) {
3910  Succ = Next[Succ];
3911  if (Succ == Last) { break; }
3912  switch (GroupByAttrType) {
3913  case atInt:
3914  if (GetIntVal(GroupBy, Succ) != RI.GetIntAttr(GroupBy)) { OutOfGroup = true; }
3915  break;
3916  case atFlt:
3917  if (GetFltVal(GroupBy, Succ) != RI.GetFltAttr(GroupBy)) { OutOfGroup = true; }
3918  break;
3919  case atStr:
3920  if (GetStrVal(GroupBy, Succ) != RI.GetStrAttr(GroupBy)) { OutOfGroup = true; }
3921  break;
3922  }
3923  if (OutOfGroup) { break; } // break out of inner for loop
3924  T->AddJointRow(*this, *this, RI.GetRowIdx(), Succ);
3925  }
3926  }
3927  return T;
3928 }
3929 
3931  printf("Total number of rows: %d\n", NumRows.Val);
3932  printf("Number of valid rows: %d\n", NumValidRows.Val);
3933  printf("Number of Int columns: %d\n", IntCols.Len());
3934  printf("Number of Flt columns: %d\n", FltCols.Len());
3935  printf("Number of Str columns: %d\n", StrColMaps.Len());
3936  TSize MemUsed = GetMemUsedKB();
3937  printf("Approximate table size is %s KB\n", TUInt64::GetStr(MemUsed).CStr());
3938 }
3939 
3941  TSize ApproxSize = 0;
3942  ApproxSize += Next.GetMemUsed()/1000; // Next vector
3943  for(int i = 0; i < IntCols.Len(); i++){
3944  ApproxSize += IntCols[i].GetMemUsed()/1000;
3945  }
3946  for(int i = 0; i < FltCols.Len(); i++){
3947  ApproxSize += FltCols[i].GetMemUsed()/1000;
3948  }
3949  for(int i = 0; i < StrColMaps.Len(); i++){
3950  ApproxSize += StrColMaps[i].GetMemUsed()/1000;
3951  }
3952  ApproxSize += RowIdMap.GetMemUsed()/1000;
3953  ApproxSize += GroupIDMapping.GetMemUsed()/1000;
3954  ApproxSize += GroupMapping.GetMemUsed()/1000;
3955  ApproxSize += RowIdBuckets.GetMemUsed() / 1000;
3956  return ApproxSize;
3957 }
3958 
3960  printf("Number of strings in pool: ");
3961  printf("%d\n", Context->StringVals.Len());
3962  printf("Number of entries in hash table: ");
3963  printf("%d\n", Context->StringVals.Reserved());
3964  TSize MemUsed = GetContextMemUsedKB();
3965  printf("Approximate context size is %s KB\n",
3966  TUInt64::GetStr(MemUsed).CStr());
3967 }
3968 
3970  TSize ApproxSize = 0;
3971  ApproxSize += Context->StringVals.GetMemUsed();
3972  return ApproxSize;
3973 }
3974 
3975 void TTable::AddTable(const TTable& T) {
3976  //for (TInt c = 0; c < S.Len(); c++) {
3977  // if (S[c] != T.S[c]) { printf("(%s,%d) != (%s,%d)\n", S[c].Val1.CStr(), S[c].Val2, T.S[c].Val1.CStr(), T.S[c].Val2); TExcept::Throw("when adding tables, their schemas must match!"); }
3978  //}
3979  for (TInt c = 0; c < Sch.Len(); c++) {
3980  TStr ColName = GetSchemaColName(c);
3981  TInt ColIdx = GetColIdx(ColName);
3982  TInt TColIdx = ColName == IdColName ? T.GetColIdx(T.IdColName) : T.GetColIdx(ColName);
3983  if (TColIdx < 0) { TExcept::Throw("when adding a table, it must contain all columns of source table!"); }
3984  switch (GetColType(ColName)) {
3985  case atInt:
3986  IntCols[ColIdx].AddV(T.IntCols[TColIdx]);
3987  break;
3988  case atFlt:
3989  FltCols[ColIdx].AddV(T.FltCols[TColIdx]);
3990  break;
3991  case atStr:
3992  StrColMaps[ColIdx].AddV(T.StrColMaps[TColIdx]);
3993  break;
3994  }
3995  }
3996 
3997  TIntV TNext(T.Next);
3998  for (TInt i = 0; i < TNext.Len(); i++) {
3999  if (TNext[i] != Last && TNext[i] != Invalid) { TNext[i] += NumRows; }
4000  }
4001 
4002  Next.AddV(TNext);
4003  // checks if table is empty
4004  if (LastValidRow >= 0) {
4006  }
4008  NumRows += T.NumRows;
4010 }
4011 
4012 // returns physical indices of rows of given table present in our table
4013 // we assume that schema matches exactly (including index of id cols)
4014 void TTable::GetCollidingRows(const TTable& Table, THashSet<TInt>& Collisions) {
4015  TIntV UniqueVec;
4017  TStrV GroupBy;
4018 
4019  // indices of columns of each type
4020  TIntV IntGroupByCols;
4021  TIntV FltGroupByCols;
4022  TIntV StrGroupByCols;
4023 
4024  TInt IKLen, FKLen, SKLen;
4025 
4026  // check that schemas match
4027  for (TInt c = 0; c < Sch.Len(); c++) {
4028  if (Sch[c].Val1 == IdColName) {
4029  if (Table.Sch[c].Val1 != Table.GetIdColName()) {
4030  TExcept::Throw("GetCollidingRows: schemas do not match!");
4031  }
4032  continue;
4033  }
4034  if (Sch[c] != Table.Sch[c]) {
4035  printf("(%s,%d) != (%s,%d)\n", Sch[c].Val1.CStr(), Sch[c].Val2, Table.Sch[c].Val1.CStr(), Table.Sch[c].Val2);
4036  TExcept::Throw("GetCollidingRows: schemas do not match!");
4037  }
4038  GroupBy.Add(NormalizeColName(Sch[c].Val1));
4039  TPair<TAttrType, TInt> ColType = Table.GetColTypeMap(Sch[c].Val1);
4040  switch (ColType.Val1) {
4041  case atInt:
4042  IntGroupByCols.Add(ColType.Val2);
4043  break;
4044  case atFlt:
4045  FltGroupByCols.Add(ColType.Val2);
4046  break;
4047  case atStr:
4048  StrGroupByCols.Add(ColType.Val2);
4049  break;
4050  }
4051  }
4052 
4053  IKLen = IntGroupByCols.Len();
4054  FKLen = FltGroupByCols.Len();
4055  SKLen = StrGroupByCols.Len();
4056 
4057  // group rows of first table
4058  GroupAux(GroupBy, Grouping, true, "", false, UniqueVec, true);
4059 
4060  // find colliding rows of second table
4061  for (TRowIterator it = Table.BegRI(); it < Table.EndRI(); it++) {
4062  // read keys from row
4063  TIntV IKey(IKLen + SKLen, 0);
4064  TFltV FKey(FKLen, 0);
4065 
4066  // find group key
4067  for (TInt c = 0; c < IKLen; c++) {
4068  IKey.Add(it.GetIntAttr(IntGroupByCols[c]));
4069  }
4070  for (TInt c = 0; c < FKLen; c++) {
4071  FKey.Add(it.GetFltAttr(FltGroupByCols[c]));
4072  }
4073  for (TInt c = 0; c < SKLen; c++) {
4074  IKey.Add(it.GetStrMapById(StrGroupByCols[c]));
4075  }
4076  // look for group matching the key
4077  TGroupKey GroupKey = TGroupKey(IKey, FKey);
4078 
4079  TInt RowIdx = it.GetRowIdx();
4080  if (Grouping.IsKey(GroupKey)) {
4081  // row exists in first table
4082  Collisions.AddKey(RowIdx);
4083  }
4084  }
4085 }
4086 
4087 void TTable::StoreIntCol(const TStr& ColName, const TIntV& ColVals) {
4088  if (ColVals.Len() != NumRows) {
4089  printf("new column dimension must agree with number of rows\n");
4090  return;
4091  }
4092  AddSchemaCol(ColName, atInt);
4094  TInt ColIdx = IntCols.Len()-1;
4095  TInt i = 0;
4096  for (TRowIterator RI = BegRI(); RI < EndRI(); RI++) {
4097  IntCols[ColIdx][RI.GetRowIdx()] = ColVals[i];
4098  i++;
4099  }
4100  TInt L = IntCols.Len();
4101  AddColType(ColName, atInt, L-1);
4102 }
4103 
4104 void TTable::StoreFltCol(const TStr& ColName, const TFltV& ColVals) {
4105  if (ColVals.Len() != NumRows) {
4106  printf("new column dimension must agree with number of rows\n");
4107  return;
4108  }
4109  AddSchemaCol(ColName, atFlt);
4111  TInt ColIdx = FltCols.Len()-1;
4112  TInt i = 0;
4113  for (TRowIterator RI = BegRI(); RI < EndRI(); RI++) {
4114  FltCols[ColIdx][RI.GetRowIdx()] = ColVals[i];
4115  i++;
4116  }
4117  TInt L = FltCols.Len();
4118  AddColType(ColName, atFlt, L-1);
4119 }
4120 
4121 void TTable::StoreStrCol(const TStr& ColName, const TStrV& ColVals) {
4122  if (ColVals.Len() != NumRows) {
4123  printf("new column dimension must agree with number of rows\n");
4124  return;
4125  }
4126  AddSchemaCol(ColName, atStr);
4128  TInt ColIdx = FltCols.Len()-1;
4129  TInt i = 0;
4130  for (TRowIterator RI = BegRI(); RI < EndRI(); RI++) {
4131  TInt Key = Context->StringVals.GetKeyId(ColVals[i]);
4132  if (Key == -1) { Context->StringVals.AddKey(ColVals[i]); }
4133  StrColMaps[ColIdx][RI.GetRowIdx()] = Key;
4134  i++;
4135  }
4136  TInt L = StrColMaps.Len();
4137  AddColType(ColName, atStr, L-1);
4138 }
4139 
4141  if (LastValidRow >= 0) {
4143  }
4144  Next.Add(Last);
4146 
4147  NumRows++;
4148  NumValidRows++;
4149 }
4150 
4151 #ifdef GCC_ATOMIC
4152 void TTable::SetFltColToConstMP(TInt UpdateColIdx, TFlt DefaultFltVal){
4153  if(!GetMP()){ TExcept::Throw("Not Using MP!");}
4154  TIntPrV Partitions;
4155  GetPartitionRanges(Partitions, omp_get_max_threads()*CHUNKS_PER_THREAD);
4156  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
4157  #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD)
4158  for (int i = 0; i < Partitions.Len(); i++){
4159  TRowIterator RowI(Partitions[i].GetVal1(), this);
4160  TRowIterator EndI(Partitions[i].GetVal2(), this);
4161  while(RowI < EndI){
4162  FltCols[UpdateColIdx][RowI.GetRowIdx()] = DefaultFltVal;
4163  RowI++;
4164  }
4165  }
4166 }
4167 
4168 // OP RS 2016/06/30: this wrapper function is required
4169 // for the code to compile on Mac OS X gcc 4.2.1
4171  return(__sync_bool_compare_and_swap(lock, 0, 1));
4172 }
4173 
4174 void TTable::UpdateFltFromTableMP(const TStr& KeyAttr, const TStr& UpdateAttr,
4175  const TTable& Table, const TStr& FKeyAttr, const TStr& ReadAttr,
4176  TFlt DefaultFltVal) {
4177  if (!GetMP()) {
4178  TExcept::Throw("Not Using MP!");
4179  }
4180 
4181  TAttrType KeyType = GetColType(KeyAttr);
4182  TAttrType FKeyType = Table.GetColType(FKeyAttr);
4183  if(KeyType != FKeyType){TExcept::Throw("Key Type Mismatch");}
4184  if(GetColType(UpdateAttr) != atFlt || Table.GetColType(ReadAttr) != atFlt){
4185  TExcept::Throw("Expecting Float values");
4186  }
4187  TStr NKeyAttr = NormalizeColName(KeyAttr);
4188  //TStr NUpdateAttr = NormalizeColName(UpdateAttr);
4189  //TStr NFKeyAttr = Table.NormalizeColName(FKeyAttr);
4190  //TStr NReadAttr = Table.NormalizeColName(ReadAttr);
4191  TInt UpdateColIdx = GetColIdx(UpdateAttr);
4192  TInt FKeyColIdx = GetColIdx(FKeyAttr);
4193  TInt ReadColIdx = GetColIdx(ReadAttr);
4194 
4195  // TODO: this should be a generic vector operation
4196  SetFltColToConstMP(UpdateColIdx, DefaultFltVal);
4197 
4198  TIntPrV Partitions;
4199  Table.GetPartitionRanges(Partitions, omp_get_max_threads()*CHUNKS_PER_THREAD);
4200  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
4201  TIntV Locks(NumRows);
4202  Locks.PutAll(0); // need to parallelize this...
4203 
4204  switch (KeyType) {
4205  // TODO: add support for other cases of KeyType
4206  case atInt: {
4207  THashMP<TInt,TIntV> Grouping;
4208  // must use physical row ids
4209  GroupByIntColMP(NKeyAttr, Grouping, true);
4210  #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD) // num_threads(1)
4211  for (int i = 0; i < Partitions.Len(); i++) {
4212  TRowIterator RowI(Partitions[i].GetVal1(), &Table);
4213  TRowIterator EndI(Partitions[i].GetVal2(), &Table);
4214  while (RowI < EndI) {
4215  TInt K = RowI.GetIntAttr(FKeyColIdx);
4216  if (Grouping.IsKey(K)) {
4217  TIntV& UpdateRows = Grouping.GetDat(K);
4218  for (int j = 0; j < UpdateRows.Len(); j++) {
4219  int* lock = &Locks[UpdateRows[j]].Val;
4220  // OP RS 2016/06/30: needed to define a wrapper function
4221  // for the code to compile on Mac OS X gcc 4.2.1
4222  //if (!__sync_bool_compare_and_swap(lock, 0, 1)) {
4223  if (!sync_bool_compare_and_swap(lock)) {
4224  continue;
4225  }
4226  //printf("key = %d, row = %d, old_score = %f\n", K.Val, j, UpdateRows[j].Val, FltCols[UpdateColIdx][UpdateRows[j]].Val);
4227  FltCols[UpdateColIdx][UpdateRows[j]] = RowI.GetFltAttr(ReadColIdx);
4228  //printf("key = %d, new_score = %f\n", K.Val, j, FltCols[UpdateColIdx][UpdateRows[j]].Val);
4229  } // end of for loop
4230  } // end of if statement
4231  RowI++;
4232  } // end of while loop
4233  } // end of for loop
4234  } // end of case atInt
4235  break;
4236  default:
4237  break;
4238  } // end of outer switch statement
4239 }
4240 #endif // GCC_ATOMIC
4241 
4242 void TTable::UpdateFltFromTable(const TStr& KeyAttr, const TStr& UpdateAttr, const TTable& Table,
4243  const TStr& FKeyAttr, const TStr& ReadAttr, TFlt DefaultFltVal){
4244  if(!IsColName(KeyAttr)){ TExcept::Throw("Bad KeyAttr parameter");}
4245  if(!IsColName(UpdateAttr)){ TExcept::Throw("Bad UpdateAttr parameter");}
4246  if(!Table.IsColName(FKeyAttr)){ TExcept::Throw("Bad FKeyAttr parameter");}
4247  if(!Table.IsColName(ReadAttr)){ TExcept::Throw("Bad ReadAttr parameter");}
4248 
4249 #ifdef GCC_ATOMIC
4250  if(GetMP()){
4251  UpdateFltFromTableMP(KeyAttr, UpdateAttr,Table, FKeyAttr, ReadAttr, DefaultFltVal);
4252  return;
4253  }
4254 #endif // GCC_ATOMIC
4255 
4256  TAttrType KeyType = GetColType(KeyAttr);
4257  TAttrType FKeyType = Table.GetColType(FKeyAttr);
4258  if(KeyType != FKeyType){TExcept::Throw("Key Type Mismatch");}
4259  if(GetColType(UpdateAttr) != atFlt || Table.GetColType(ReadAttr) != atFlt){
4260  TExcept::Throw("Expecting Float values");
4261  }
4262  TStr NKeyAttr = NormalizeColName(KeyAttr);
4263  TStr NUpdateAttr = NormalizeColName(UpdateAttr);
4264  TStr NFKeyAttr = Table.NormalizeColName(FKeyAttr);
4265  TStr NReadAttr = Table.NormalizeColName(ReadAttr);
4266  TInt UpdateColIdx = GetColIdx(UpdateAttr);
4267 
4268  for(TRowIterator iter = BegRI(); iter < EndRI(); iter++){
4269  FltCols[UpdateColIdx][iter.GetRowIdx()] = DefaultFltVal;
4270  }
4271 
4272  switch(KeyType) {
4273  // TODO: add support for other cases of KeyType
4274  case atInt: {
4275  TIntIntVH Grouping;
4276  GroupByIntCol(NKeyAttr, Grouping, TIntV(), true, true);
4277  for (TRowIterator RI = Table.BegRI(); RI < Table.EndRI(); RI++) {
4278  TInt K = RI.GetIntAttr(NFKeyAttr);
4279  if (Grouping.IsKey(K)) {
4280  TIntV& UpdateRows = Grouping.GetDat(K);
4281  for (int i = 0; i < UpdateRows.Len(); i++) {
4282  FltCols[UpdateColIdx][UpdateRows[i]] = RI.GetFltAttr(NReadAttr);
4283  } // end of for loop
4284  } // end of if statement
4285  } // end of for loop
4286  } // end of case atInt
4287  break;
4288  default:
4289  break;
4290  } // end of outer switch statement
4291 }
4292 
4293 
4294 // can ONLY be called when a table is being initialised (before IDs are allocated)
4296  for (TInt c = 0; c < Sch.Len(); c++) {
4297  TStr ColName = GetSchemaColName(c);
4298  if (ColName == IdColName) { continue; }
4299 
4300  TInt ColIdx = GetColIdx(ColName);
4301 
4302  switch (GetColType(ColName)) {
4303  case atInt:
4304  IntCols[ColIdx].Add(RI.GetIntAttr(ColName));
4305  break;
4306  case atFlt:
4307  FltCols[ColIdx].Add(RI.GetFltAttr(ColName));
4308  break;
4309  case atStr:
4310  StrColMaps[ColIdx].Add(RI.GetStrMapByName(ColName));
4311  break;
4312  }
4313  }
4315 }
4316 
4317 void TTable::AddRowV(const TIntV& IntVals, const TFltV& FltVals, const TStrV& StrVals) {
4318  for (TInt c = 0; c < IntVals.Len(); c++) {
4319  IntCols[c].Add(IntVals[c]);
4320  }
4321  for (TInt c = 0; c < FltVals.Len(); c++) {
4322  FltCols[c].Add(FltVals[c]);
4323  }
4324  for (TInt c = 0; c < StrVals.Len(); c++) {
4325  AddStrVal(c, StrVals[c]);
4326  }
4328 }
4329 
4330 void TTable::ResizeTable(int RowCount) {
4331  if (RowCount == 0) {
4332  // initialize empty table
4333  NumValidRows = 0;
4336  }
4337  if (Next.Len() < RowCount) {
4338  TInt FltOffset = IntCols.Len();
4339  TInt StrOffset = FltOffset + FltCols.Len();
4340  TInt TotalCols = StrOffset + StrColMaps.Len();
4341 #ifdef USE_OPENMP
4342  #pragma omp parallel for schedule(static)
4343 #endif
4344  for (int i = 0; i < TotalCols+1; i++) {
4345  if (i < FltOffset) {
4346  IntCols[i].Reserve(RowCount, RowCount);
4347  } else if (i < StrOffset) {
4348  FltCols[i-FltOffset].Reserve(RowCount, RowCount);
4349  } else if (i < TotalCols) {
4350  StrColMaps[i-StrOffset].Reserve(RowCount, RowCount);
4351  } else {
4352  Next.Reserve(RowCount, RowCount);
4353  }
4354  }
4355  } else if (Next.Len() > RowCount) {
4356  TInt FltOffset = IntCols.Len();
4357  TInt StrOffset = FltOffset + FltCols.Len();
4358  TInt TotalCols = StrOffset + StrColMaps.Len();
4359 #ifdef USE_OPENMP
4360  #pragma omp parallel for schedule(static)
4361 #endif
4362  for (int i = 0; i < TotalCols+1; i++) {
4363  if (i < FltOffset) {
4364  IntCols[i].Trunc(RowCount);
4365  } else if (i < StrOffset) {
4366  FltCols[i-FltOffset].Trunc(RowCount);
4367  } else if (i < TotalCols) {
4368  StrColMaps[i-StrOffset].Trunc(RowCount);
4369  } else {
4370  Next.Trunc(RowCount);
4371  }
4372  }
4373  }
4374 }
4375 
4376 int TTable::GetEmptyRowsStart(int NewRows) {
4377  int start = -1;
4378 #ifdef USE_OPENMP
4379  #pragma omp critical
4380  {
4381 #endif
4382  start = NumRows;
4383  NumRows += NewRows;
4384  NumValidRows += NewRows;
4385  // To make this function thread-safe, the following call must be done before the
4386  // code enters parallel region.
4387  // ResizeTable(NumRows);
4388  Assert(NumRows <= Next.Len());
4389  if (LastValidRow >= 0) {Next[LastValidRow] = start;}
4390  LastValidRow = start+NewRows-1;
4391  Next[LastValidRow] = Last;
4392 #ifdef USE_OPENMP
4393  }
4394 #endif
4395  Assert (start >= 0);
4396  return start;
4397 }
4398 
4399 void TTable::AddSelectedRows(const TTable& Table, const TIntV& RowIDs) {
4400  int NewRows = RowIDs.Len();
4401  if (NewRows == 0) { return; }
4402  // this call should be thread-safe
4403  int start = GetEmptyRowsStart(NewRows);
4404  for (TInt r = 0; r < NewRows; r++) {
4405  TInt CurrRowIdx = RowIDs[r];
4406  for (TInt i = 0; i < Table.IntCols.Len(); i++) {
4407  IntCols[i][start+r] = Table.IntCols[i][CurrRowIdx];
4408  }
4409  for (TInt i = 0; i < Table.FltCols.Len(); i++) {
4410  FltCols[i][start+r] = Table.FltCols[i][CurrRowIdx];
4411  }
4412  for (TInt i = 0; i < Table.StrColMaps.Len(); i++) {
4413  StrColMaps[i][start+r] = Table.StrColMaps[i][CurrRowIdx];
4414  }
4415  }
4416  for (TInt r = 0; r < NewRows-1; r++) {
4417  Next[start+r] = start+r+1;
4418  }
4419 }
4420 
4421 void TTable::AddNRows(int NewRows, const TVec<TIntV>& IntColsP, const TVec<TFltV>& FltColsP, const TVec<TIntV>& StrColMapsP) {
4422  if (NewRows == 0) { return; }
4423  // this call should be thread-safe
4424  int start = GetEmptyRowsStart(NewRows);
4425  for (TInt r = 0; r < NewRows; r++) {
4426  for (TInt i = 0; i < IntColsP.Len(); i++) {
4427  IntCols[i][start+r] = IntColsP[i][r];
4428  }
4429  for (TInt i = 0; i < FltColsP.Len(); i++) {
4430  FltCols[i][start+r] = FltColsP[i][r];
4431  }
4432  for (TInt i = 0; i < StrColMapsP.Len(); i++) {
4433  StrColMaps[i][start+r] = StrColMapsP[i][r];
4434  }
4435  }
4436  for (TInt r = 0; r < NewRows-1; r++) {
4437  Next[start+r] = start+r+1;
4438  }
4439 }
4440 
4441 #ifdef USE_OPENMP
4442 void TTable::AddNJointRowsMP(const TTable& T1, const TTable& T2, const TVec<TIntPrV>& JointRowIDSet) {
4443  //double startFn = omp_get_wtime();
4444  int JointTableSize = 0;
4445  TIntV StartOffsets(JointRowIDSet.Len());
4446  for (int i = 0; i < JointRowIDSet.Len(); i++) {
4447  StartOffsets[i] = JointTableSize;
4448  JointTableSize += JointRowIDSet[i].Len();
4449  }
4450  if (JointTableSize == 0) {
4451  TExcept::Throw("Joint table is empty");
4452  }
4453  //double endOffsets = omp_get_wtime();
4454  //printf("Offsets time = %f\n",endOffsets-startFn);
4455  ResizeTable(JointTableSize);
4456  //double endResize = omp_get_wtime();
4457  //printf("Resize time = %f\n",endResize-endOffsets);
4458  NumRows = JointTableSize;
4459  NumValidRows = JointTableSize;
4460  Assert(NumRows <= Next.Len());
4461 
4462  TInt IntOffset = T1.IntCols.Len();
4463  TInt FltOffset = T1.FltCols.Len();
4464  TInt StrOffset = T1.StrColMaps.Len();
4465 
4466  TInt IdOffset = IntOffset + T2.IntCols.Len();
4467  RowIdMap.Clr();
4468  for (TInt IdCnt = 0; IdCnt < JointTableSize; IdCnt++) {
4469  RowIdMap.AddDat(IdCnt, IdCnt);
4470  }
4471 
4472  #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD)
4473  for (int j = 0; j < JointRowIDSet.Len(); j++) {
4474  const TIntPrV& RowIDs = JointRowIDSet[j];
4475  int start = StartOffsets[j];
4476  int NewRows = RowIDs.Len();
4477  if (NewRows == 0) {continue;}
4478  for (TInt r = 0; r < NewRows; r++){
4479  TIntPr CurrRowIdPr = RowIDs[r];
4480  for(TInt i = 0; i < T1.IntCols.Len(); i++){
4481  IntCols[i][start+r] = T1.IntCols[i][CurrRowIdPr.GetVal1()];
4482  }
4483  for(TInt i = 0; i < T1.FltCols.Len(); i++){
4484  FltCols[i][start+r] = T1.FltCols[i][CurrRowIdPr.GetVal1()];
4485  }
4486  for(TInt i = 0; i < T1.StrColMaps.Len(); i++){
4487  StrColMaps[i][start+r] = T1.StrColMaps[i][CurrRowIdPr.GetVal1()];
4488  }
4489  for(TInt i = 0; i < T2.IntCols.Len(); i++){
4490  IntCols[i+IntOffset][start+r] = T2.IntCols[i][CurrRowIdPr.GetVal2()];
4491  }
4492  for(TInt i = 0; i < T2.FltCols.Len(); i++){
4493  FltCols[i+FltOffset][start+r] = T2.FltCols[i][CurrRowIdPr.GetVal2()];
4494  }
4495  for(TInt i = 0; i < T2.StrColMaps.Len(); i++){
4496  StrColMaps[i+StrOffset][start+r] = T2.StrColMaps[i][CurrRowIdPr.GetVal2()];
4497  }
4498  IntCols[IdOffset][start+r] = start+r;
4499  }
4500  for(TInt r = 0; r < NewRows; r++){
4501  Next[start+r] = start+r+1;
4502  }
4503  }
4504  LastValidRow = JointTableSize-1;
4505  Next[LastValidRow] = Last;
4506  //double endIterate = omp_get_wtime();
4507  //printf("Iterate time = %f\n",endIterate-endResize);
4508 }
4509 #endif // USE_OPENMP
4510 
4512  Schema NewSchema;
4513  for (TInt c = 0; c < Sch.Len(); c++) {
4514  if (Sch[c].Val1 != GetIdColName()) {
4515  NewSchema.Add(TPair<TStr,TAttrType>(Sch[c].Val1, Sch[c].Val2));
4516  }
4517  }
4518  PTable result = TTable::New(NewSchema, Context);
4519  result->AddTable(*this);
4520  result->UnionAllInPlace(Table);
4521  return result;
4522 }
4523 
4524 void TTable::UnionAllInPlace(const TTable& Table) {
4525  AddTable(Table);
4526  // TODO: For the moment, IDs are not initialized (to avoid having too many ID columns)
4527  //result->InitIds();
4528 }
4529 
4530 
4531 PTable TTable::Union(const TTable& Table) {
4532  Schema NewSchema;
4533  THashSet<TInt> Collisions;
4534  TStrV ColNames;
4535 
4536  for (TInt c = 0; c < Sch.Len(); c++) {
4537  if (Sch[c].Val1 != GetIdColName()) {
4538  NewSchema.Add(TPair<TStr,TAttrType>(Sch[c].Val1, Sch[c].Val2));
4539  ColNames.Add(Sch[c].Val1);
4540  }
4541  }
4542  PTable result = TTable::New(NewSchema, Context);
4543 
4544  GetCollidingRows(Table, Collisions);
4545 
4546  result->AddTable(*this);
4547 
4548  result->Unique(ColNames);
4549 
4550  // this part should be made faster by adding all the rows in one go
4551  for (TRowIterator it = Table.BegRI(); it < Table.EndRI(); it++) {
4552  if (!Collisions.IsKey(it.GetRowIdx())) {
4553  result->AddRowI(it);
4554  }
4555  }
4556 
4557  // printf("this: %d %d, table: %d %d, result: %d %d\n",
4558  // this->GetNumRows().Val, this->GetNumValidRows().Val,
4559  // Table.GetNumRows().Val, Table.GetNumValidRows().Val,
4560  // result->GetNumRows().Val, result->GetNumValidRows().Val);
4561 
4562  result->InitIds();
4563  return result;
4564 }
4565 
4566 
4568  Schema NewSchema;
4569  THashSet<TInt> Collisions;
4570 
4571  for (TInt c = 0; c < Sch.Len(); c++) {
4572  if (Sch[c].Val1 != GetIdColName()) {
4573  NewSchema.Add(TPair<TStr,TAttrType>(Sch[c].Val1, Sch[c].Val2));
4574  }
4575  }
4576  PTable result = TTable::New(NewSchema, Context);
4577 
4578  GetCollidingRows(Table, Collisions);
4579 
4580  // this part should be made faster by adding all the rows in one go
4581  for (TRowIterator it = Table.BegRI(); it < Table.EndRI(); it++) {
4582  if (Collisions.IsKey(it.GetRowIdx())) {
4583  result->AddRowI(it);
4584  }
4585  }
4586  result->InitIds();
4587  return result;
4588 }
4589 
4590 // TTable cannot be const because we will eventually call Table->GroupAux
4591 // as of now, GroupAux cannot be const because it modifies the table in some cases
4593  Schema NewSchema;
4594  THashSet<TInt> Collisions;
4595 
4596  for (TInt c = 0; c < Sch.Len(); c++) {
4597  if (Sch[c].Val1 != GetIdColName()) {
4598  NewSchema.Add(TPair<TStr,TAttrType>(Sch[c].Val1, Sch[c].Val2));
4599  }
4600  }
4601  PTable result = TTable::New(NewSchema, Context);
4602 
4603  Table.GetCollidingRows(*this, Collisions);
4604 
4605  // this part should be made faster by adding all the rows in one go
4606  for (TRowIterator it = BegRI(); it < EndRI(); it++) {
4607  if (!Collisions.IsKey(it.GetRowIdx())) {
4608  result->AddRowI(it);
4609  }
4610  }
4611  result->InitIds();
4612  return result;
4613 }
4614 
4615 PTable TTable::Project(const TStrV& ProjectCols) {
4616  Schema NewSchema;
4617  for (TInt c = 0; c < ProjectCols.Len(); c++) {
4618  if (!IsColName(ProjectCols[c])) { TExcept::Throw("no such column " + ProjectCols[c]); }
4619  NewSchema.Add(TPair<TStr,TAttrType>(ProjectCols[c], GetColType(ProjectCols[c])));
4620  }
4621 
4622  PTable result = TTable::New(NewSchema, Context);
4623  result->AddTable(*this);
4624  result->InitIds();
4625  return result;
4626 }
4627 
4628 TBool TTable::IsAttr(const TStr& Attr) {
4629  return IsColName(Attr);
4630 }
4631 
4632 TStr TTable::RenumberColName(const TStr& ColName) const {
4633  TStr NColName = ColName;
4634  if (NColName.GetCh(NColName.Len()-2) == '-') {
4635  NColName = NColName.GetSubStr(0,NColName.Len()-3);
4636  }
4637  TInt Conflicts = 0;
4638  for (TInt i = 0; i < Sch.Len(); i++) {
4639  if (NColName == Sch[i].Val1.GetSubStr(0, Sch[i].Val1.Len()-3)) {
4640  Conflicts++;
4641  }
4642  }
4643  Conflicts++;
4644  NColName = NColName + "-" + Conflicts.GetStr();
4645  return NColName;
4646 }
4647 
4648 TStr TTable::DenormalizeColName(const TStr& ColName) const {
4649  TStr DColName = ColName;
4650  if (DColName.Len() == 0) { return DColName; }
4651  if (DColName.GetCh(0) == '_') { return DColName; }
4652  if (DColName.GetCh(DColName.Len()-2) == '-') {
4653  DColName = DColName.GetSubStr(0,DColName.Len()-3);
4654  }
4655  TInt Conflicts = 0;
4656  for (TInt i = 0; i < Sch.Len(); i++) {
4657  if (DColName == Sch[i].Val1.GetSubStr(0, Sch[i].Val1.Len()-3)) {
4658  Conflicts++;
4659  }
4660  }
4661  if (Conflicts > 1) { return ColName; }
4662  else { return DColName; }
4663 }
4664 
4666  Schema DSch;
4667  for (TInt i = 0; i < Sch.Len(); i++) {
4668  DSch.Add(TPair<TStr, TAttrType>(DenormalizeColName(Sch[i].Val1), Sch[i].Val2));
4669  }
4670  return DSch;
4671 }
4672 
4673 void TTable::AddIntCol(const TStr& ColName) {
4674  AddSchemaCol(ColName, atInt);
4676  TInt L = IntCols.Len();
4677  AddColType(ColName, atInt, L-1);
4678 }
4679 
4680 void TTable::AddFltCol(const TStr& ColName) {
4681  AddSchemaCol(ColName, atFlt);
4683  TInt L = FltCols.Len();
4684  AddColType(ColName, atFlt, L-1);
4685 }
4686 
4687 void TTable::AddStrCol(const TStr& ColName) {
4688  AddSchemaCol(ColName, atStr);
4690  TInt L = StrColMaps.Len();
4691  AddColType(ColName, atStr, L-1);
4692 }
4693 
4694 void TTable::ClassifyAux(const TIntV& SelectedRows, const TStr& LabelName, const TInt& PositiveLabel, const TInt& NegativeLabel) {
4695  AddSchemaCol(LabelName, atInt);
4696  TInt LabelColIdx = IntCols.Len();
4697  AddColType(LabelName, atInt, LabelColIdx);
4699  for (TInt i = 0; i < NumRows; i++) {
4700  IntCols[LabelColIdx][i] = NegativeLabel;
4701  }
4702  for (TInt i = 0; i < SelectedRows.Len(); i++) {
4703  IntCols[LabelColIdx][SelectedRows[i]] = PositiveLabel;
4704  }
4705 }
4706 
4707 #ifdef USE_OPENMP
4708 void TTable::ColGenericOpMP(TInt ArgColIdx1, TInt ArgColIdx2, TAttrType ArgType1, TAttrType ArgType2, TInt ResColIdx, TArithOp op){
4709  TAttrType ResType = atFlt;
4710  if(ArgType1 == atInt && ArgType2 == atInt){ ResType = atInt;}
4711  TIntPrV Partitions;
4712  GetPartitionRanges(Partitions, omp_get_max_threads()*CHUNKS_PER_THREAD);
4713  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
4714  #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD)
4715  for (int i = 0; i < Partitions.Len(); i++){
4716  TRowIterator RowI(Partitions[i].GetVal1(), this);
4717  TRowIterator EndI(Partitions[i].GetVal2(), this);
4718  while(RowI < EndI){
4719  if(ResType == atInt){
4720  TInt V1 = RowI.GetIntAttr(ArgColIdx1);
4721  TInt V2 = RowI.GetIntAttr(ArgColIdx2);
4722  if (op == aoAdd) { IntCols[ResColIdx][RowI.GetRowIdx()] = V1 + V2; }
4723  if (op == aoSub) { IntCols[ResColIdx][RowI.GetRowIdx()] = V1 - V2; }
4724  if (op == aoMul) { IntCols[ResColIdx][RowI.GetRowIdx()] = V1 * V2; }
4725  if (op == aoDiv) { IntCols[ResColIdx][RowI.GetRowIdx()] = V1 / V2; }
4726  if (op == aoMod) { IntCols[ResColIdx][RowI.GetRowIdx()] = V1 % V2; }
4727  if (op == aoMin) { IntCols[ResColIdx][RowI.GetRowIdx()] = (V1 < V2) ? V1 : V2;}
4728  if (op == aoMax) { IntCols[ResColIdx][RowI.GetRowIdx()] = (V1 > V2) ? V1 : V2;}
4729  } else{
4730  TFlt V1 = (ArgType1 == atInt) ? (TFlt)RowI.GetIntAttr(ArgColIdx1) : RowI.GetFltAttr(ArgColIdx1);
4731  TFlt V2 = (ArgType2 == atInt) ? (TFlt)RowI.GetIntAttr(ArgColIdx2) : RowI.GetFltAttr(ArgColIdx2);
4732  if (op == aoAdd) { FltCols[ResColIdx][RowI.GetRowIdx()] = V1 + V2; }
4733  if (op == aoSub) { FltCols[ResColIdx][RowI.GetRowIdx()] = V1 - V2; }
4734  if (op == aoMul) { FltCols[ResColIdx][RowI.GetRowIdx()] = V1 * V2; }
4735  if (op == aoDiv) { FltCols[ResColIdx][RowI.GetRowIdx()] = V1 / V2; }
4736  if (op == aoMod) { TExcept::Throw("Cannot find modulo for float columns"); }
4737  if (op == aoMin) { FltCols[ResColIdx][RowI.GetRowIdx()] = (V1 < V2) ? V1 : V2;}
4738  if (op == aoMax) { FltCols[ResColIdx][RowI.GetRowIdx()] = (V1 > V2) ? V1 : V2;}
4739  }
4740  RowI++;
4741  }
4742  }
4743 }
4744 #endif // USE_OPENMP
4745 
4746 /* Performs generic operations on two numeric attributes
4747  * Operation can be +, -, *, /, %, min or max
4748  * Alternative is to write separate functions for each operation
4749  * Branch prediction may result in as fast performance anyway ?
4750  *
4751  */
4752 void TTable::ColGenericOp(const TStr& Attr1, const TStr& Attr2, const TStr& ResAttr, TArithOp op) {
4753  // check if attributes are valid
4754  if (!IsAttr(Attr1)) TExcept::Throw("No attribute present: " + Attr1);
4755  if (!IsAttr(Attr2)) TExcept::Throw("No attribute present: " + Attr2);
4756  TPair<TAttrType, TInt> Info1 = GetColTypeMap(Attr1);
4757  TPair<TAttrType, TInt> Info2 = GetColTypeMap(Attr2);
4758  TAttrType Arg1Type = Info1.Val1;
4759  TAttrType Arg2Type = Info2.Val1;
4760  if (Arg1Type == atStr || Arg2Type == atStr) {
4761  TExcept::Throw("Only numeric columns supported in arithmetic operations.");
4762  }
4763  if(Arg1Type == atInt && Arg2Type == atFlt && ResAttr == ""){
4764  TExcept::Throw("Trying to write float values to an existing int-typed column");
4765  }
4766  // source column indices
4767  TInt ColIdx1 = Info1.Val2;
4768  TInt ColIdx2 = Info2.Val2;
4769 
4770  // destination column index
4771  TInt ColIdx3 = ColIdx1;
4772  // Create empty result column with type that of first attribute
4773  if (ResAttr != "") {
4774  if (Arg1Type == atInt && Arg2Type == atInt) {
4775  AddIntCol(ResAttr);
4776  }
4777  else {
4778  AddFltCol(ResAttr);
4779  }
4780  ColIdx3 = GetColIdx(ResAttr);
4781  }
4782 #ifdef USE_OPENMP
4783  if(GetMP()){
4784  ColGenericOpMP(ColIdx1, ColIdx2, Arg1Type, Arg2Type, ColIdx3, op);
4785  return;
4786  }
4787 #endif //USE_OPENMP
4788  TAttrType ResType = atFlt;
4789  if(Arg1Type == atInt && Arg2Type == atInt){ printf("hooray!\n"); ResType = atInt;}
4790  for (TRowIterator RowI = BegRI(); RowI < EndRI(); RowI++) {
4791  //printf("%d %d %d %d\n", ColIdx1.Val, ColIdx2.Val, ColIdx3.Val, RowI.GetRowIdx().Val);
4792  if(ResType == atInt){
4793  TInt V1 = RowI.GetIntAttr(ColIdx1);
4794  TInt V2 = RowI.GetIntAttr(ColIdx2);
4795  if (op == aoAdd) { IntCols[ColIdx3][RowI.GetRowIdx()] = V1 + V2; }
4796  if (op == aoSub) { IntCols[ColIdx3][RowI.GetRowIdx()] = V1 - V2; }
4797  if (op == aoMul) { IntCols[ColIdx3][RowI.GetRowIdx()] = V1 * V2; }
4798  if (op == aoDiv) { IntCols[ColIdx3][RowI.GetRowIdx()] = V1 / V2; }
4799  if (op == aoMod) { IntCols[ColIdx3][RowI.GetRowIdx()] = V1 % V2; }
4800  if (op == aoMin) { IntCols[ColIdx3][RowI.GetRowIdx()] = (V1 < V2) ? V1 : V2;}
4801  if (op == aoMax) { IntCols[ColIdx3][RowI.GetRowIdx()] = (V1 > V2) ? V1 : V2;}
4802  } else{
4803  TFlt V1 = (Arg1Type == atInt) ? (TFlt)RowI.GetIntAttr(ColIdx1) : RowI.GetFltAttr(ColIdx1);
4804  TFlt V2 = (Arg2Type == atInt) ? (TFlt)RowI.GetIntAttr(ColIdx2) : RowI.GetFltAttr(ColIdx2);
4805  if (op == aoAdd) { FltCols[ColIdx3][RowI.GetRowIdx()] = V1 + V2; }
4806  if (op == aoSub) { FltCols[ColIdx3][RowI.GetRowIdx()] = V1 - V2; }
4807  if (op == aoMul) { FltCols[ColIdx3][RowI.GetRowIdx()] = V1 * V2; }
4808  if (op == aoDiv) { FltCols[ColIdx3][RowI.GetRowIdx()] = V1 / V2; }
4809  if (op == aoMod) { TExcept::Throw("Cannot find modulo for float columns"); }
4810  if (op == aoMin) { FltCols[ColIdx3][RowI.GetRowIdx()] = (V1 < V2) ? V1 : V2;}
4811  if (op == aoMax) { FltCols[ColIdx3][RowI.GetRowIdx()] = (V1 > V2) ? V1 : V2;}
4812  }
4813  }
4814 }
4815 
4816 void TTable::ColAdd(const TStr& Attr1, const TStr& Attr2, const TStr& ResultAttrName) {
4817  ColGenericOp(Attr1, Attr2, ResultAttrName, aoAdd);
4818 }
4819 
4820 void TTable::ColSub(const TStr& Attr1, const TStr& Attr2, const TStr& ResultAttrName) {
4821  ColGenericOp(Attr1, Attr2, ResultAttrName, aoSub);
4822 }
4823 
4824 void TTable::ColMul(const TStr& Attr1, const TStr& Attr2, const TStr& ResultAttrName) {
4825  ColGenericOp(Attr1, Attr2, ResultAttrName, aoMul);
4826 }
4827 
4828 void TTable::ColDiv(const TStr& Attr1, const TStr& Attr2, const TStr& ResultAttrName) {
4829  ColGenericOp(Attr1, Attr2, ResultAttrName, aoDiv);
4830 }
4831 
4832 void TTable::ColMod(const TStr& Attr1, const TStr& Attr2, const TStr& ResultAttrName) {
4833  ColGenericOp(Attr1, Attr2, ResultAttrName, aoMod);
4834 }
4835 
4836 void TTable::ColMin(const TStr& Attr1, const TStr& Attr2, const TStr& ResultAttrName) {
4837  ColGenericOp(Attr1, Attr2, ResultAttrName, aoMin);
4838 }
4839 
4840 void TTable::ColMax(const TStr& Attr1, const TStr& Attr2, const TStr& ResultAttrName) {
4841  ColGenericOp(Attr1, Attr2, ResultAttrName, aoMax);
4842 }
4843 
4844 void TTable::ColGenericOp(const TStr& Attr1, TTable& Table, const TStr& Attr2, const TStr& ResAttr,
4845  TArithOp op, TBool AddToFirstTable) {
4846  // check if attributes are valid
4847  if (!IsAttr(Attr1)) { TExcept::Throw("No attribute present: " + Attr1); }
4848  if (!Table.IsAttr(Attr2)) { TExcept::Throw("No attribute present: " + Attr2); }
4849 
4850  if (NumValidRows != Table.NumValidRows) {
4851  TExcept::Throw("Tables do not have equal number of rows");
4852  }
4853 
4854  TPair<TAttrType, TInt> Info1 = GetColTypeMap(Attr1);
4855  TPair<TAttrType, TInt> Info2 = Table.GetColTypeMap(Attr2);
4856  TAttrType Arg1Type = Info1.Val1;
4857  TAttrType Arg2Type = Info2.Val1;
4858  if (Info1.Val1 == atStr || Info2.Val1 == atStr) {
4859  TExcept::Throw("Only numeric columns supported in arithmetic operations.");
4860  }
4861  if(Arg1Type == atInt && Arg2Type == atFlt && ResAttr == ""){
4862  TExcept::Throw("Trying to write float values to an existing int-typed column");
4863  }
4864  // source column indices
4865  TInt ColIdx1 = Info1.Val2;
4866  TInt ColIdx2 = Info2.Val2;
4867 
4868  // destination column index
4869  TInt ColIdx3 = AddToFirstTable ? ColIdx1 : ColIdx2;
4870 
4871  // Create empty result column in appropriate table with type that of first attribute
4872  if (ResAttr != "") {
4873  if (AddToFirstTable) {
4874  if (Arg1Type == atInt && Arg2Type == atInt) {
4875  AddIntCol(ResAttr);
4876  } else {
4877  AddFltCol(ResAttr);
4878  }
4879  ColIdx3 = GetColIdx(ResAttr);
4880  }
4881  else {
4882  if (Arg1Type == atInt && Arg2Type == atInt) {
4883  Table.AddIntCol(ResAttr);
4884  } else {
4885  Table.AddFltCol(ResAttr);
4886  }
4887  ColIdx3 = Table.GetColIdx(ResAttr);
4888  }
4889  }
4890 
4891  /*
4892  #ifdef USE_OPENMP
4893  if(GetMP()){
4894  ColGenericOpMP(Table, AddToFirstTable, ColIdx1, ColIdx2, Arg1Type, Arg2Type, ColIdx3, op);
4895  return;
4896  }
4897  #endif