SNAP Library 4.0, Developer Reference  2017-07-27 13:18:06
SNAP, a general purpose, high performance system for analysis and manipulation of large networks
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
conv.h
Go to the documentation of this file.
1 #ifndef CONV_H
2 #define CONV_H
3 
4 namespace TSnap {
5 
7 template<class PGraph>
8 PGraph ToGraph(PTable Table, const TStr& SrcCol, const TStr& DstCol, TAttrAggr AggrPolicy)
9 {
10  PGraph Graph = PGraph::TObj::New();
11 
12  const TAttrType NodeType = Table->GetColType(SrcCol);
13  Assert(NodeType == Table->GetColType(DstCol));
14  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
15  const TInt DstColIdx = Table->GetColIdx(DstCol);
16 
17  // make single pass over all rows in the table
18  if (NodeType == atInt) {
19  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
20  if ((Table->Next)[CurrRowIdx] == Table->Invalid) { continue; }
21  // add src and dst nodes to graph if they are not seen earlier
22  TInt SVal = (Table->IntCols)[SrcColIdx][CurrRowIdx];
23  TInt DVal = (Table->IntCols)[DstColIdx][CurrRowIdx];
24  //Using AddNodeUnchecked ensures that no error is thrown when the same node is seen twice
25  Graph->AddNodeUnchecked(SVal);
26  Graph->AddNodeUnchecked(DVal);
27  Graph->AddEdgeUnchecked(SVal, DVal);
28  }
29  } else if (NodeType == atFlt) {
30  // node values - i.e. the unique values of src/dst col
31  //THashSet<TInt> IntNodeVals; // for both int and string node attr types.
32  THash<TFlt, TInt> FltNodeVals;
33  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
34  if ((Table->Next)[CurrRowIdx] == Table->Invalid) { continue; }
35  // add src and dst nodes to graph if they are not seen earlier
36  TInt SVal, DVal;
37  TFlt FSVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
38  SVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FSVal);
39  TFlt FDVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
40  DVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FDVal);
41  Graph->AddEdge(SVal, DVal);
42  }
43  } else {
44  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
45  if ((Table->Next)[CurrRowIdx] == Table->Invalid) { continue; }
46  // add src and dst nodes to graph if they are not seen earlier
47  TInt SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
48 // if (strlen(Table->GetContextKey(SVal)) == 0) { continue; } //illegal value
49  TInt DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
50 // if (strlen(Table->GetContextKey(DVal)) == 0) { continue; } //illegal value
51  //Using AddNodeUnchecked ensures that no error is thrown when the same node is seen twice
52  Graph->AddNodeUnchecked(SVal);
53  Graph->AddNodeUnchecked(DVal);
54  Graph->AddEdgeUnchecked(SVal, DVal);
55  }
56  }
57 
58  Graph->SortNodeAdjV();
59  return Graph;
60 }
61 
63 template<class PGraph>
64 PGraph ToNetwork(PTable Table,
65  const TStr& SrcCol, const TStr& DstCol,
66  TStrV& SrcAttrV, TStrV& DstAttrV, TStrV& EdgeAttrV,
67  TAttrAggr AggrPolicy)
68 {
69  PGraph Graph = PGraph::TObj::New();
70 
71  const TAttrType NodeType = Table->GetColType(SrcCol);
72  Assert(NodeType == Table->GetColType(DstCol));
73  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
74  const TInt DstColIdx = Table->GetColIdx(DstCol);
75 
76  //Table->AddGraphAttributeV(SrcAttrV, false, true, false);
77  //Table->AddGraphAttributeV(DstAttrV, false, false, true);
78  //Table->AddGraphAttributeV(EdgeAttrV, true, false, true);
79 
80  // node values - i.e. the unique values of src/dst col
81  //THashSet<TInt> IntNodeVals; // for both int and string node attr types.
82  THash<TFlt, TInt> FltNodeVals;
83 
84  // node attributes
85  THash<TInt, TStrIntVH> NodeIntAttrs;
86  THash<TInt, TStrFltVH> NodeFltAttrs;
87  THash<TInt, TStrStrVH> NodeStrAttrs;
88 
89  // make single pass over all rows in the table
90  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
91  if ((Table->Next)[CurrRowIdx] == Table->Invalid) {
92  continue;
93  }
94 
95  // add src and dst nodes to graph if they are not seen earlier
96  TInt SVal, DVal;
97  if (NodeType == atFlt) {
98  TFlt FSVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
99  SVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FSVal);
100  TFlt FDVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
101  DVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FDVal);
102  } else if (NodeType == atInt || NodeType == atStr) {
103  if (NodeType == atInt) {
104  SVal = (Table->IntCols)[SrcColIdx][CurrRowIdx];
105  DVal = (Table->IntCols)[DstColIdx][CurrRowIdx];
106  } else {
107  SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
108  if (strlen(Table->GetContextKey(SVal)) == 0) { continue; } //illegal value
109  DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
110  if (strlen(Table->GetContextKey(DVal)) == 0) { continue; } //illegal value
111  }
112  if (!Graph->IsNode(SVal)) {Graph->AddNode(SVal); }
113  if (!Graph->IsNode(DVal)) {Graph->AddNode(DVal); }
114  //CheckAndAddIntNode(Graph, IntNodeVals, SVal);
115  //CheckAndAddIntNode(Graph, IntNodeVals, DVal);
116  }
117 
118  // add edge and edge attributes
119  Graph->AddEdge(SVal, DVal, CurrRowIdx);
120 
121  // Aggregate edge attributes and add to graph
122  for (TInt i = 0; i < EdgeAttrV.Len(); i++) {
123  TStr ColName = EdgeAttrV[i];
124  TAttrType T = Table->GetColType(ColName);
125  TInt Index = Table->GetColIdx(ColName);
126  switch (T) {
127  case atInt:
128  Graph->AddIntAttrDatE(CurrRowIdx, Table->IntCols[Index][CurrRowIdx], ColName);
129  break;
130  case atFlt:
131  Graph->AddFltAttrDatE(CurrRowIdx, Table->FltCols[Index][CurrRowIdx], ColName);
132  break;
133  case atStr:
134  Graph->AddStrAttrDatE(CurrRowIdx, Table->GetStrVal(Index, CurrRowIdx), ColName);
135  break;
136  }
137  }
138 
139  // get src and dst node attributes into hashmaps
140  if ((Table->SrcNodeAttrV).Len() > 0) {
141  Table->AddNodeAttributes(SVal, Table->SrcNodeAttrV, CurrRowIdx, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
142  }
143 
144  if ((Table->DstNodeAttrV).Len() > 0) {
145  Table->AddNodeAttributes(DVal, Table->DstNodeAttrV, CurrRowIdx, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
146  }
147  }
148 
149  // aggregate node attributes and add to graph
150  if ((Table->SrcNodeAttrV).Len() > 0 || (Table->DstNodeAttrV).Len() > 0) {
151  for (TNEANet::TNodeI NodeI = Graph->BegNI(); NodeI < Graph->EndNI(); NodeI++) {
152  TInt NId = NodeI.GetId();
153  if (NodeIntAttrs.IsKey(NId)) {
154  TStrIntVH IntAttrVals = NodeIntAttrs.GetDat(NId);
155  for (TStrIntVH::TIter it = IntAttrVals.BegI(); it < IntAttrVals.EndI(); it++) {
156  TInt AttrVal = Table->AggregateVector<TInt>(it.GetDat(), AggrPolicy);
157  Graph->AddIntAttrDatN(NId, AttrVal, it.GetKey());
158  }
159  }
160  if (NodeFltAttrs.IsKey(NId)) {
161  TStrFltVH FltAttrVals = NodeFltAttrs.GetDat(NId);
162  for (TStrFltVH::TIter it = FltAttrVals.BegI(); it < FltAttrVals.EndI(); it++) {
163  TFlt AttrVal = Table->AggregateVector<TFlt>(it.GetDat(), AggrPolicy);
164  Graph->AddFltAttrDatN(NId, AttrVal, it.GetKey());
165  }
166  }
167  if (NodeStrAttrs.IsKey(NId)) {
168  TStrStrVH StrAttrVals = NodeStrAttrs.GetDat(NId);
169  for (TStrStrVH::TIter it = StrAttrVals.BegI(); it < StrAttrVals.EndI(); it++) {
170  TStr AttrVal = Table->AggregateVector<TStr>(it.GetDat(), AggrPolicy);
171  Graph->AddStrAttrDatN(NId, AttrVal, it.GetKey());
172  }
173  }
174  }
175  }
176 
177  return Graph;
178 }
179 
181 template<class PGraph>
182 PGraph ToNetwork(PTable Table,
183  const TStr& SrcCol, const TStr& DstCol, TAttrAggr AggrPolicy)
184 {
185  TStrV V;
186  return ToNetwork<PGraph>(Table, SrcCol, DstCol, V, AggrPolicy);
187 }
188 
189 #ifdef GCC_ATOMIC
190 template<class PGraphMP>
192 PGraphMP ToGraphMP(PTable Table, const TStr& SrcCol, const TStr& DstCol) {
193  // double start = omp_get_wtime();
194  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
195  const TInt DstColIdx = Table->GetColIdx(DstCol);
196  const TAttrType NodeType = Table->GetColType(SrcCol);
197  Assert(NodeType == Table->GetColType(DstCol));
198 
199  const TInt NumRows = Table->NumValidRows;
200 
201  TIntV SrcCol1, DstCol1, SrcCol2, DstCol2;
202 
203  #pragma omp parallel sections num_threads(4)
204  {
205  #pragma omp section
206  { SrcCol1.Reserve(NumRows, NumRows); }
207  #pragma omp section
208  { SrcCol2.Reserve(NumRows, NumRows); }
209  #pragma omp section
210  { DstCol1.Reserve(NumRows, NumRows); }
211  #pragma omp section
212  { DstCol2.Reserve(NumRows, NumRows); }
213  }
214 
215  // double endResize = omp_get_wtime();
216  // printf("Resize time = %f\n", endResize-start);
217 
218  TIntPrV Partitions;
219  Table->GetPartitionRanges(Partitions, omp_get_max_threads());
220  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
221 
222  // double endPartition = omp_get_wtime();
223  // printf("Partition time = %f\n", endPartition-endResize);
224 
225  omp_set_num_threads(omp_get_max_threads());
226  if (NodeType == atInt) {
227  #pragma omp parallel for schedule(static)
228  for (int i = 0; i < Partitions.Len(); i++) {
229  TRowIterator RowI(Partitions[i].GetVal1(), Table());
230  TRowIterator EndI(Partitions[i].GetVal2(), Table());
231  while (RowI < EndI) {
232  TInt RowId = RowI.GetRowIdx();
233  SrcCol1[RowId] = RowI.GetIntAttr(SrcColIdx);
234  SrcCol2[RowId] = RowI.GetIntAttr(SrcColIdx);
235  DstCol1[RowId] = RowI.GetIntAttr(DstColIdx);
236  DstCol2[RowId] = RowI.GetIntAttr(DstColIdx);
237  RowI++;
238  }
239  }
240  }
241  else if (NodeType == atStr) {
242  #pragma omp parallel for schedule(static)
243  for (int i = 0; i < Partitions.Len(); i++) {
244  TRowIterator RowI(Partitions[i].GetVal1(), Table());
245  TRowIterator EndI(Partitions[i].GetVal2(), Table());
246  while (RowI < EndI) {
247  TInt RowId = RowI.GetRowIdx();
248  SrcCol1[RowId] = RowI.GetStrMapById(SrcColIdx);
249  SrcCol2[RowId] = RowI.GetStrMapById(SrcColIdx);
250  DstCol1[RowId] = RowI.GetStrMapById(DstColIdx);
251  DstCol2[RowId] = RowI.GetStrMapById(DstColIdx);
252  RowI++;
253  }
254  }
255  }
256 
257  omp_set_num_threads(omp_get_max_threads());
258  #pragma omp parallel
259  {
260  #pragma omp single nowait
261  {
262  #pragma omp task untied shared(SrcCol1, DstCol1)
263  { TTable::QSortKeyVal(SrcCol1, DstCol1, 0, NumRows-1); }
264  }
265  #pragma omp single nowait
266  {
267  #pragma omp task untied shared(SrcCol2, DstCol2)
268  { TTable::QSortKeyVal(DstCol2, SrcCol2, 0, NumRows-1); }
269  }
270  #pragma omp taskwait
271  }
272 
273  // TTable::PSRSKeyVal(SrcCol1, DstCol1, 0, NumRows-1);
274  // TTable::PSRSKeyVal(DstCol2, SrcCol2, 0, NumRows-1);
275 
276  // TInt IsS = TTable::CheckSortedKeyVal(SrcCol1, DstCol1, 0, NumRows-1);
277  // TInt IsD = TTable::CheckSortedKeyVal(DstCol2, SrcCol2, 0, NumRows-1);
278  // printf("IsSorted = %d %d\n", IsS.Val, IsD.Val);
279 
280  // double endSort = omp_get_wtime();
281  // printf("Sort time = %f\n", endSort-endCopy);
282  //return TNGraphMP::New(10, 100);
283 
284  TInt NumThreads = omp_get_max_threads();
285  TInt PartSize = (NumRows/NumThreads);
286 
287  TIntV SrcOffsets, DstOffsets;
288  SrcOffsets.Add(0);
289  for (TInt i = 1; i < NumThreads; i++) {
290  TInt CurrOffset = i * PartSize;
291  while (CurrOffset < (i+1) * PartSize &&
292  SrcCol1[CurrOffset-1] == SrcCol1[CurrOffset]) {
293  CurrOffset++;
294  }
295  if (CurrOffset < (i+1) * PartSize) { SrcOffsets.Add(CurrOffset); }
296  }
297  SrcOffsets.Add(NumRows);
298 
299  DstOffsets.Add(0);
300  for (TInt i = 1; i < NumThreads; i++) {
301  TInt CurrOffset = i * PartSize;
302  while (CurrOffset < (i+1) * PartSize &&
303  DstCol2[CurrOffset-1] == DstCol2[CurrOffset]) {
304  CurrOffset++;
305  }
306  if (CurrOffset < (i+1) * PartSize) { DstOffsets.Add(CurrOffset); }
307  }
308  DstOffsets.Add(NumRows);
309 
310  TInt SrcPartCnt = SrcOffsets.Len()-1;
311  TInt DstPartCnt = DstOffsets.Len()-1;
312 
313  // for (TInt i = 0; i < SrcOffsets.Len(); i++) {
314  // printf("%d ", SrcOffsets[i].Val);
315  // }
316  // printf("\n");
317  // for (TInt i = 0; i < DstOffsets.Len(); i++) {
318  // printf("%d ", DstOffsets[i].Val);
319  // }
320  // printf("\n");
321 
322  TIntV SrcNodeCounts, DstNodeCounts;
323  SrcNodeCounts.Reserve(SrcPartCnt, SrcPartCnt);
324  DstNodeCounts.Reserve(DstPartCnt, DstPartCnt);
325 
326  #pragma omp parallel for schedule(dynamic)
327  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
328  if (t < SrcPartCnt) {
329  TInt i = t;
330  if (SrcOffsets[i] != SrcOffsets[i+1]) {
331  SrcNodeCounts[i] = 1;
332  TInt CurrNode = SrcCol1[SrcOffsets[i]];
333  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
334  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
335  if (j < SrcOffsets[i+1]) {
336  SrcNodeCounts[i]++;
337  CurrNode = SrcCol1[j];
338  }
339  }
340  }
341  } else {
342  TInt i = t - SrcPartCnt;
343  if (DstOffsets[i] != DstOffsets[i+1]) {
344  DstNodeCounts[i] = 1;
345  TInt CurrNode = DstCol2[DstOffsets[i]];
346  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
347  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
348  if (j < DstOffsets[i+1]) {
349  DstNodeCounts[i]++;
350  CurrNode = DstCol2[j];
351  }
352  }
353  }
354  }
355  }
356 
357  // for (TInt i = 0; i < SrcNodeCounts.Len(); i++) {
358  // printf("%d ", SrcNodeCounts[i].Val);
359  // }
360  // printf("\n");
361  // for (TInt i = 0; i < DstNodeCounts.Len(); i++) {
362  // printf("%d ", DstNodeCounts[i].Val);
363  // }
364  // printf("\n");
365 
366  TInt TotalSrcNodes = 0;
367  TIntV SrcIdOffsets;
368  for (int i = 0; i < SrcPartCnt; i++) {
369  SrcIdOffsets.Add(TotalSrcNodes);
370  TotalSrcNodes += SrcNodeCounts[i];
371  }
372 
373  TInt TotalDstNodes = 0;
374  TIntV DstIdOffsets;
375  for (int i = 0; i < DstPartCnt; i++) {
376  DstIdOffsets.Add(TotalDstNodes);
377  TotalDstNodes += DstNodeCounts[i];
378  }
379 
380  // printf("Total Src = %d, Total Dst = %d\n", TotalSrcNodes.Val, TotalDstNodes.Val);
381 
382  TIntPrV SrcNodeIds, DstNodeIds;
383  #pragma omp parallel sections
384  {
385  #pragma omp section
386  { SrcNodeIds.Reserve(TotalSrcNodes, TotalSrcNodes); }
387  #pragma omp section
388  { DstNodeIds.Reserve(TotalDstNodes, TotalDstNodes); }
389  }
390 
391  #pragma omp parallel for schedule(dynamic)
392  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
393  if (t < SrcPartCnt) {
394  TInt i = t;
395  if (SrcOffsets[i] != SrcOffsets[i+1]) {
396  TInt CurrNode = SrcCol1[SrcOffsets[i]];
397  TInt ThreadOffset = SrcIdOffsets[i];
398  SrcNodeIds[ThreadOffset] = TIntPr(CurrNode, SrcOffsets[i]);
399  TInt CurrCount = 1;
400  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
401  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
402  if (j < SrcOffsets[i+1]) {
403  CurrNode = SrcCol1[j];
404  SrcNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
405  CurrCount++;
406  }
407  }
408  }
409  } else {
410  TInt i = t - SrcPartCnt;
411  if (DstOffsets[i] != DstOffsets[i+1]) {
412  TInt CurrNode = DstCol2[DstOffsets[i]];
413  TInt ThreadOffset = DstIdOffsets[i];
414  DstNodeIds[ThreadOffset] = TIntPr(CurrNode, DstOffsets[i]);
415  TInt CurrCount = 1;
416  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
417  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
418  if (j < DstOffsets[i+1]) {
419  CurrNode = DstCol2[j];
420  DstNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
421  CurrCount++;
422  }
423  }
424  }
425  }
426  }
427 
428  // double endNode = omp_get_wtime();
429  // printf("Node time = %f\n", endNode-endSort);
430 
431  TIntTrV Nodes;
432  Nodes.Reserve(TotalSrcNodes+TotalDstNodes);
433 
434  // double endNodeResize = omp_get_wtime();
435  // printf("(NodeResize time = %f)\n", endNodeResize-endNode);
436 
437  TInt i = 0, j = 0;
438  while (i < TotalSrcNodes && j < TotalDstNodes) {
439  if (SrcNodeIds[i].Val1 == DstNodeIds[j].Val1) {
440  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, j));
441  i++;
442  j++;
443  } else if (SrcNodeIds[i].Val1 < DstNodeIds[j].Val1) {
444  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1));
445  i++;
446  } else {
447  Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j));
448  j++;
449  }
450  }
451  for (; i < TotalSrcNodes; i++) { Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1)); }
452  for (; j < TotalDstNodes; j++) { Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j)); }
453 
454  // double endMerge = omp_get_wtime();
455  // printf("Merge time = %f\n", endMerge-endNode);
456 
457  TInt NumNodes = Nodes.Len();
458  // printf("NumNodes = %d\n", NumNodes.Val);
459 
460  PGraphMP Graph = TNGraphMP::New(NumNodes, NumRows);
461  NumThreads = 1;
462  int Delta = (NumNodes+NumThreads-1)/NumThreads;
463 
464  TVec<TIntV> InVV(NumNodes);
465  TVec<TIntV> OutVV(NumNodes);
466 
467  omp_set_num_threads(NumThreads);
468  #pragma omp parallel for schedule(static,Delta)
469  for (int m = 0; m < NumNodes; m++) {
470  //double startTr = omp_get_wtime();
471  //TIntV OutV, InV;
472  TInt n, i, j;
473  Nodes[m].GetVal(n, i, j);
474  if (i >= 0) {
475  TInt Offset = SrcNodeIds[i].GetVal2();
476  TInt Sz = DstCol1.Len()-Offset;
477  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
478  //printf("OutV: %d %d %d\n", n.Val, Offset.Val, Sz.Val);
479  OutVV[m].Reserve(Sz);
480  }
481  if (j >= 0) {
482  TInt Offset = DstNodeIds[j].GetVal2();
483  TInt Sz = SrcCol2.Len()-Offset;
484  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
485  //printf("OutV: %d %d %d\n", n.Val, Offset.Val, Sz.Val);
486  InVV[m].Reserve(Sz);
487  }
488  //double endTr = omp_get_wtime();
489  //printf("Thread=%d, i=%d, t=%f\n", omp_get_thread_num(), m, endTr-startTr);
490  }
491 
492  // double endAlloc = omp_get_wtime();
493  // printf("Alloc time = %f\n", endAlloc-endMerge);
494 
495  NumThreads = omp_get_max_threads();
496  Delta = (NumNodes+NumThreads-1)/(10*NumThreads);
497  omp_set_num_threads(NumThreads);
498  #pragma omp parallel for schedule(dynamic)
499  for (int m = 0; m < NumNodes; m++) {
500  //double startTr = omp_get_wtime();
501  //TIntV OutV, InV;
502  TInt n, i, j;
503  Nodes[m].GetVal(n, i, j);
504  if (i >= 0) {
505  TInt Offset = SrcNodeIds[i].GetVal2();
506  TInt Sz = DstCol1.Len()-Offset;
507  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
508  //printf("OutV: %d %d %d\n", n.Val, Offset.Val, Sz.Val);
509  OutVV[m].CopyUniqueFrom(DstCol1, Offset, Sz);
510  }
511  if (j >= 0) {
512  TInt Offset = DstNodeIds[j].GetVal2();
513  TInt Sz = SrcCol2.Len()-Offset;
514  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
515  //printf("OutV: %d %d %d\n", n.Val, Offset.Val, Sz.Val);
516  InVV[m].CopyUniqueFrom(SrcCol2, Offset, Sz);
517  }
518  Graph->AddNodeWithEdges(n, InVV[m], OutVV[m]);
519  //double endTr = omp_get_wtime();
520  //printf("Thread=%d, i=%d, t=%f\n", omp_get_thread_num(), m, endTr-startTr);
521  }
522  Graph->SetNodes(NumNodes);
523 
524  // double endAdd = omp_get_wtime();
525  // printf("Add time = %f\n", endAdd-endAlloc);
526 
527  return Graph;
528 }
529 
531 template<class PGraphMP>
532 PGraphMP ToGraphMP3(PTable Table, const TStr& SrcCol, const TStr& DstCol) {
533  PNGraphMP Graph;
534  int MaxThreads = omp_get_max_threads();
535  int Length, Threads, Delta, Nodes, Last;
536  uint64_t NumNodesEst;
537  TInt SrcColIdx, DstColIdx;
538  TIntV InVec, OutVec;
539 
540  SrcColIdx = Table->GetColIdx(SrcCol);
541  DstColIdx = Table->GetColIdx(DstCol);
542  const TAttrType NodeType = Table->GetColType(SrcCol);
543  Assert(NodeType == Table->GetColType(DstCol));
544 
545 
546  /* Estimate number of nodes in the graph */
547  int NumRows = Table->Next.Len();
548  double Load = 10;
549  int sz = NumRows / Load;
550  int *buckets = (int *)malloc(sz * sizeof(int));
551 
552  #pragma omp parallel for
553  for (int i = 0; i < sz; i++)
554  buckets[i] = 0;
555 
556  if (NodeType == atInt) {
557  #pragma omp parallel for
558  for (int i = 0; i < NumRows; i++) {
559  int vert = Table->IntCols[DstColIdx][i];
560  buckets[vert % sz] = 1;
561  }
562  }
563  else if (NodeType == atStr ) {
564  #pragma omp parallel for
565  for (int i = 0; i < NumRows; i++) {
566  int vert = (Table->StrColMaps)[DstColIdx][i];
567  buckets[vert % sz] = 1;
568  }
569  }
570  int cnt = 0;
571  #pragma omp parallel for reduction(+:cnt)
572  for (int i = 0; i < sz; i++) {
573  if (buckets[i] == 0)
574  cnt += 1;
575  }
576 
577  NumNodesEst = sz * log ((double)sz / cnt);
578  free (buckets);
579 
580  /* Until we correctly estimate the number of nodes */
581  while (1)
582  {
583  Graph = TNGraphMP::New(NumNodesEst, 100);
584 
585  Length = Graph->Reserved();
586  Threads = MaxThreads/2;
587  Delta = (Length + Threads - 1) / Threads;
588 
589  OutVec.Gen(Length);
590  InVec.Gen(Length);
591 
592  /* build the node hash table, count the size of edge lists */
593  Last = NumRows;
594  Nodes = 0;
595  omp_set_num_threads(Threads);
596  #pragma omp parallel for schedule(static, Delta)
597  for (int CurrRowIdx = 0; CurrRowIdx < Last; CurrRowIdx++) {
598  if ((uint64_t) Nodes + 1000 >= NumNodesEst) {
599  /* need bigger hash table */
600  continue;
601  }
602 
603  TInt SVal, DVal;
604  if (NodeType == atInt) {
605  SVal = Table->IntCols[SrcColIdx][CurrRowIdx];
606  DVal = Table->IntCols[DstColIdx][CurrRowIdx];
607  }
608  else if (NodeType == atStr ) {
609  SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
610  DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
611  }
612  int SrcIdx = abs((SVal.GetPrimHashCd()) % Length);
613  if (!Graph->AddOutEdge1(SrcIdx, SVal, DVal)) {
614  #pragma omp critical
615  {
616  Nodes++;
617  }
618  }
619  __sync_fetch_and_add(&OutVec[SrcIdx].Val, 1);
620 
621  int DstIdx = abs((DVal.GetPrimHashCd()) % Length);
622  if (!Graph->AddInEdge1(DstIdx, SVal, DVal)) {
623  #pragma omp critical
624  {
625  Nodes++;
626  }
627  }
628  __sync_fetch_and_add(&InVec[DstIdx].Val, 1);
629 
630  }
631  if ((uint64_t) Nodes + 1000 >= NumNodesEst) {
632  /* We need to double our num nodes estimate */
633  Graph.Clr();
634  InVec.Clr();
635  OutVec.Clr();
636  NumNodesEst *= 2;
637  }
638  else {
639  break;
640  }
641  }
642 
643  Graph->SetNodes(Nodes);
644 
645  uint Edges = 0;
646  for (int i = 0; i < Length; i++) {
647  Edges += OutVec[i] + InVec[i];
648  }
649 
650  for (int Idx = 0; Idx < Length; Idx++) {
651  if (OutVec[Idx] > 0 || InVec[Idx] > 0) {
652  Graph->ReserveNodeDegs(Idx, InVec[Idx], OutVec[Idx]);
653  }
654  }
655 
656  /* assign edges */
657  Length = Graph->Reserved();
658  Threads = MaxThreads;
659  Delta = (Length + Threads - 1) / Threads;
660 
661  omp_set_num_threads(Threads);
662  #pragma omp parallel for schedule(static,Delta)
663  for (int CurrRowIdx = 0; CurrRowIdx < Last; CurrRowIdx++) {
664  TInt SVal, DVal;
665  if (NodeType == atInt) {
666  SVal = Table->IntCols[SrcColIdx][CurrRowIdx];
667  DVal = Table->IntCols[DstColIdx][CurrRowIdx];
668  }
669  else if (NodeType == atStr) {
670  SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
671  DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
672  }
673 
674  Graph->AddOutEdge2(SVal, DVal);
675  Graph->AddInEdge2(SVal, DVal);
676  }
677 
678  /* sort edges */
679  Length = Graph->Reserved();
680  Threads = MaxThreads*2;
681  Delta = (Length + Threads - 1) / Threads;
682 
683  omp_set_num_threads(Threads);
684  #pragma omp parallel for schedule(dynamic)
685  for (int Idx = 0; Idx < Length; Idx++) {
686  if (OutVec[Idx] > 0 || InVec[Idx] > 0) {
687  Graph->SortEdges(Idx, InVec[Idx], OutVec[Idx]);
688  }
689  }
690 
691  return Graph;
692 }
693 
695 template<class PGraphMP>
696 inline PGraphMP ToNetworkMP(PTable Table,
697  const TStr& SrcCol, const TStr& DstCol,
698  TStrV& SrcAttrV, TStrV& DstAttrV, TStrV& EdgeAttrV,
699  TAttrAggr AggrPolicy) {
701 
703  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
704  const TInt DstColIdx = Table->GetColIdx(DstCol);
705  const TInt NumRows = Table->GetNumValidRows();
706 
707  const TAttrType NodeType = Table->GetColType(SrcCol);
708  Assert(NodeType == Table->GetColType(DstCol));
709 
710 
711  TIntV SrcCol1, EdgeCol1, EdgeCol2, DstCol2;
712 
713  THash<TInt, TStrIntVH> NodeIntAttrs;
714  THash<TInt, TStrFltVH> NodeFltAttrs;
715  THash<TInt, TStrStrVH> NodeStrAttrs;
716 
717  #pragma omp parallel sections num_threads(4)
718  {
719  #pragma omp section
720  { SrcCol1.Reserve(NumRows, NumRows); }
721  #pragma omp section
722  { EdgeCol1.Reserve(NumRows, NumRows); }
723  #pragma omp section
724  { DstCol2.Reserve(NumRows, NumRows); }
725  #pragma omp section
726  { EdgeCol2.Reserve(NumRows, NumRows); }
727  }
729 
731  TIntPrV Partitions;
732  Table->GetPartitionRanges(Partitions, omp_get_max_threads());
733  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
734 
735  // double endPartition = omp_get_wtime();
736  // printf("Partition time = %f\n", endPartition-endResize);
737 
738  omp_set_num_threads(omp_get_max_threads());
739  if (NodeType == atInt) {
740  #pragma omp parallel for schedule(static)
741  for (int i = 0; i < Partitions.Len(); i++) {
742  TRowIterator RowI(Partitions[i].GetVal1(), Table());
743  TRowIterator EndI(Partitions[i].GetVal2(), Table());
744  while (RowI < EndI) {
745  TInt RowId = RowI.GetRowIdx();
746  SrcCol1[RowId] = RowI.GetIntAttr(SrcColIdx);
747  EdgeCol1[RowId] = RowId;
748  DstCol2[RowId] = RowI.GetIntAttr(DstColIdx);
749  EdgeCol2[RowId] = RowId;
750  RowI++;
751  }
752  }
753  }
754  else if (NodeType == atStr) {
755  #pragma omp parallel for schedule(static)
756  for (int i = 0; i < Partitions.Len(); i++) {
757  TRowIterator RowI(Partitions[i].GetVal1(), Table());
758  TRowIterator EndI(Partitions[i].GetVal2(), Table());
759  while (RowI < EndI) {
760  TInt RowId = RowI.GetRowIdx();
761  SrcCol1[RowId] = RowI.GetStrMapById(SrcColIdx);
762  EdgeCol1[RowId] = RowId;
763  DstCol2[RowId] = RowI.GetStrMapById(DstColIdx);
764  EdgeCol2[RowId] = RowId;
765  RowI++;
766  }
767  }
768  }
770 
771  Sw->Start(TStopwatch::Sort);
772  omp_set_num_threads(omp_get_max_threads());
773  #pragma omp parallel
774  {
775  #pragma omp single nowait
776  {
777  #ifndef GLib_WIN32
778  #pragma omp task untied shared(SrcCol1, EdgeCol1)
779  #endif
780  { TTable::QSortKeyVal(SrcCol1, EdgeCol1, 0, NumRows-1); }
781  }
782  #pragma omp single nowait
783  {
784  #ifndef GLib_WIN32
785  #pragma omp task untied shared(EdgeCol2, DstCol2)
786  #endif
787  { TTable::QSortKeyVal(DstCol2, EdgeCol2, 0, NumRows-1); }
788  }
789  #ifndef GLib_WIN32
790  #pragma omp taskwait
791  #endif
792  }
793  Sw->Stop(TStopwatch::Sort);
794 
796  TInt NumThreads = omp_get_max_threads();
797  TInt PartSize = (NumRows/NumThreads);
798 
799  // Find the offset of all partitions, each of which contains a list of rows.
800  // Nodes from same sources or destinations are ensured to be kept within same partition.
801  TIntV SrcOffsets, DstOffsets;
802  SrcOffsets.Add(0);
803  for (TInt i = 1; i < NumThreads; i++) {
804  TInt CurrOffset = i * PartSize;
805  while (CurrOffset < (i+1) * PartSize &&
806  SrcCol1[CurrOffset-1] == SrcCol1[CurrOffset]) {
807  // ensure that rows from the same sources are grouped together
808  CurrOffset++;
809  }
810  if (CurrOffset < (i+1) * PartSize) { SrcOffsets.Add(CurrOffset); }
811  }
812  SrcOffsets.Add(NumRows);
813 
814  DstOffsets.Add(0);
815  for (TInt i = 1; i < NumThreads; i++) {
816  TInt CurrOffset = i * PartSize;
817  while (CurrOffset < (i+1) * PartSize &&
818  DstCol2[CurrOffset-1] == DstCol2[CurrOffset]) {
819  // ensure that rows to the same destinations are grouped together
820  CurrOffset++;
821  }
822  if (CurrOffset < (i+1) * PartSize) { DstOffsets.Add(CurrOffset); }
823  }
824  DstOffsets.Add(NumRows);
825 
826  TInt SrcPartCnt = SrcOffsets.Len()-1; // number of partitions
827  TInt DstPartCnt = DstOffsets.Len()-1; // number of partitions
828 
829  // count the number of source nodes and destination nodes in each partition
830  TIntV SrcNodeCounts, DstNodeCounts;
831  SrcNodeCounts.Reserve(SrcPartCnt, SrcPartCnt);
832  DstNodeCounts.Reserve(DstPartCnt, DstPartCnt);
833 
834  #pragma omp parallel for schedule(dynamic)
835  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
836  if (t < SrcPartCnt) {
837  TInt i = t;
838  if (SrcOffsets[i] != SrcOffsets[i+1]) {
839  SrcNodeCounts[i] = 1;
840  TInt CurrNode = SrcCol1[SrcOffsets[i]];
841  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
842  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
843  if (j < SrcOffsets[i+1]) {
844  SrcNodeCounts[i]++;
845  CurrNode = SrcCol1[j];
846  }
847  }
848  }
849  } else {
850  TInt i = t - SrcPartCnt;
851  if (DstOffsets[i] != DstOffsets[i+1]) {
852  DstNodeCounts[i] = 1;
853  TInt CurrNode = DstCol2[DstOffsets[i]];
854  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
855  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
856  if (j < DstOffsets[i+1]) {
857  DstNodeCounts[i]++;
858  CurrNode = DstCol2[j];
859  }
860  }
861  }
862  }
863  }
864 
865  TInt TotalSrcNodes = 0;
866  TIntV SrcIdOffsets;
867  for (int i = 0; i < SrcPartCnt; i++) {
868  SrcIdOffsets.Add(TotalSrcNodes);
869  TotalSrcNodes += SrcNodeCounts[i];
870  }
871 
872  TInt TotalDstNodes = 0;
873  TIntV DstIdOffsets;
874  for (int i = 0; i < DstPartCnt; i++) {
875  DstIdOffsets.Add(TotalDstNodes);
876  TotalDstNodes += DstNodeCounts[i];
877  }
878 
879  // printf("Total Src = %d, Total Dst = %d\n", TotalSrcNodes.Val, TotalDstNodes.Val);
880 
881  // find vector of (node_id, start_offset) where start_offset is the index of the first row with node_id
882  TIntPrV SrcNodeIds, DstNodeIds;
883  #pragma omp parallel sections
884  {
885  #pragma omp section
886  { SrcNodeIds.Reserve(TotalSrcNodes, TotalSrcNodes); }
887  #pragma omp section
888  { DstNodeIds.Reserve(TotalDstNodes, TotalDstNodes); }
889  }
890 
891  // Find the starting offset of each node (in both src and dst)
892  #pragma omp parallel for schedule(dynamic)
893  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
894  if (t < SrcPartCnt) {
895  TInt i = t;
896  if (SrcOffsets[i] != SrcOffsets[i+1]) {
897  TInt CurrNode = SrcCol1[SrcOffsets[i]];
898  TInt ThreadOffset = SrcIdOffsets[i];
899  SrcNodeIds[ThreadOffset] = TIntPr(CurrNode, SrcOffsets[i]);
900  TInt CurrCount = 1;
901  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
902  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
903  if (j < SrcOffsets[i+1]) {
904  CurrNode = SrcCol1[j];
905  SrcNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
906  CurrCount++;
907  }
908  }
909  }
910  } else {
911  TInt i = t - SrcPartCnt;
912  if (DstOffsets[i] != DstOffsets[i+1]) {
913  TInt CurrNode = DstCol2[DstOffsets[i]];
914  TInt ThreadOffset = DstIdOffsets[i];
915  DstNodeIds[ThreadOffset] = TIntPr(CurrNode, DstOffsets[i]);
916  TInt CurrCount = 1;
917  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
918  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
919  if (j < DstOffsets[i+1]) {
920  CurrNode = DstCol2[j];
921  DstNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
922  CurrCount++;
923  }
924  }
925  }
926  }
927  }
928  Sw->Stop(TStopwatch::Group);
929 
931  // Find the combined neighborhood (both out-neighbors and in-neighbors) of each node
932  TIntTrV Nodes;
933  Nodes.Reserve(TotalSrcNodes+TotalDstNodes);
934 
935  TInt i = 0, j = 0;
936  while (i < TotalSrcNodes && j < TotalDstNodes) {
937  if (SrcNodeIds[i].Val1 == DstNodeIds[j].Val1) {
938  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, j));
939  i++;
940  j++;
941  } else if (SrcNodeIds[i].Val1 < DstNodeIds[j].Val1) {
942  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1));
943  i++;
944  } else {
945  Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j));
946  j++;
947  }
948  }
949  for (; i < TotalSrcNodes; i++) { Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1)); }
950  for (; j < TotalDstNodes; j++) { Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j)); }
952 
954  TInt NumNodes = Nodes.Len();
955  PGraphMP Graph = PGraphMP::TObj::New(NumNodes, NumRows);
956 // NumThreads = omp_get_max_threads();
957 // int Delta = (NumNodes+NumThreads-1)/NumThreads;
958 
959  TVec<TIntV> InVV(NumNodes);
960  TVec<TIntV> OutVV(NumNodes);
961 
962 // omp_set_num_threads(NumThreads);
963  #pragma omp parallel for schedule(static,100)
964  for (int m = 0; m < NumNodes; m++) {
965  //double startTr = omp_get_wtime();
966  //TIntV OutV, InV;
967  TInt n, i, j;
968  Nodes[m].GetVal(n, i, j);
969  if (i >= 0) {
970  TInt Offset = SrcNodeIds[i].GetVal2();
971  TInt Sz = EdgeCol1.Len()-Offset;
972  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
973  OutVV[m].Reserve(Sz);
974  OutVV[m].CopyUniqueFrom(EdgeCol1, Offset, Sz);
975  }
976  if (j >= 0) {
977  TInt Offset = DstNodeIds[j].GetVal2();
978  TInt Sz = EdgeCol2.Len()-Offset;
979  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
980  InVV[m].Reserve(Sz);
981  InVV[m].CopyUniqueFrom(EdgeCol2, Offset, Sz);
982  }
983  Graph->AddNodeWithEdges(n, InVV[m], OutVV[m]);
984  }
985  Graph->SetNodes(NumNodes);
987 
989  omp_set_num_threads(omp_get_max_threads());
990  if (NodeType == atInt) {
991  #pragma omp parallel for schedule(static)
992  for (int i = 0; i < Partitions.Len(); i++) {
993  TRowIterator RowI(Partitions[i].GetVal1(), Table());
994  TRowIterator EndI(Partitions[i].GetVal2(), Table());
995  while (RowI < EndI) {
996  TInt RowId = RowI.GetRowIdx(); // EdgeId
997  TInt SrcId = RowI.GetIntAttr(SrcColIdx);
998  TInt DstId = RowI.GetIntAttr(DstColIdx);
999  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1000  RowI++;
1001  for (TInt ea_i = 0; ea_i < EdgeAttrV.Len(); ea_i++) {
1002  TStr ColName = EdgeAttrV[ea_i];
1003  TAttrType T = Table->GetColType(ColName);
1004  TInt Index = Table->GetColIdx(ColName);
1005  switch (T) {
1006  case atInt:
1007  Graph->AddIntAttrDatE(RowId, Table->IntCols[Index][RowId], ColName);
1008  break;
1009  case atFlt:
1010  Graph->AddFltAttrDatE(RowId, Table->FltCols[Index][RowId], ColName);
1011  break;
1012  case atStr:
1013  Graph->AddStrAttrDatE(RowId, Table->GetStrVal(Index, RowId), ColName);
1014  break;
1015  }
1016  }
1017  if ((Table->SrcNodeAttrV).Len() > 0) {
1018  Table->AddNodeAttributes(SrcId, Table->SrcNodeAttrV, RowId, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
1019  }
1020 
1021  if ((Table->DstNodeAttrV).Len() > 0) {
1022  Table->AddNodeAttributes(SrcId, Table->DstNodeAttrV, RowId, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
1023  }
1024  }
1025  }
1026  }
1027  else if (NodeType == atStr) {
1028  #pragma omp parallel for schedule(static)
1029  for (int i = 0; i < Partitions.Len(); i++) {
1030  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1031  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1032  while (RowI < EndI) {
1033  TInt RowId = RowI.GetRowIdx(); // EdgeId
1034  TInt SrcId = RowI.GetStrMapById(SrcColIdx);
1035  TInt DstId = RowI.GetStrMapById(DstColIdx);
1036  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1037  RowI++;
1038  for (TInt ea_i = 0; ea_i < EdgeAttrV.Len(); ea_i++) {
1039  TStr ColName = EdgeAttrV[ea_i];
1040  TAttrType T = Table->GetColType(ColName);
1041  TInt Index = Table->GetColIdx(ColName);
1042  switch (T) {
1043  case atInt:
1044  Graph->AddIntAttrDatE(RowId, Table->IntCols[Index][RowId], ColName);
1045  break;
1046  case atFlt:
1047  Graph->AddFltAttrDatE(RowId, Table->FltCols[Index][RowId], ColName);
1048  break;
1049  case atStr:
1050  Graph->AddStrAttrDatE(RowId, Table->GetStrVal(Index, RowId), ColName);
1051  break;
1052  }
1053  }
1054  if ((Table->SrcNodeAttrV).Len() > 0) {
1055  Table->AddNodeAttributes(SrcId, Table->SrcNodeAttrV, RowId, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
1056  }
1057 
1058  if ((Table->DstNodeAttrV).Len() > 0) {
1059  Table->AddNodeAttributes(SrcId, Table->DstNodeAttrV, RowId, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
1060  }
1061 
1062  }
1063  }
1064 
1065  }
1066 
1067  // aggregate node attributes and add to graph
1068  if ((Table->SrcNodeAttrV).Len() > 0 || (Table->DstNodeAttrV).Len() > 0) {
1069  for (typename PGraphMP::TObj::TNodeI NodeI = Graph->BegNI(); NodeI < Graph->EndNI(); NodeI++) {
1070  TInt NId = NodeI.GetId();
1071  if (NodeIntAttrs.IsKey(NId)) {
1072  TStrIntVH IntAttrVals = NodeIntAttrs.GetDat(NId);
1073  for (TStrIntVH::TIter it = IntAttrVals.BegI(); it < IntAttrVals.EndI(); it++) {
1074  TInt AttrVal = Table->AggregateVector<TInt>(it.GetDat(), AggrPolicy);
1075  Graph->AddIntAttrDatN(NId, AttrVal, it.GetKey());
1076  }
1077  }
1078  if (NodeFltAttrs.IsKey(NId)) {
1079  TStrFltVH FltAttrVals = NodeFltAttrs.GetDat(NId);
1080  for (TStrFltVH::TIter it = FltAttrVals.BegI(); it < FltAttrVals.EndI(); it++) {
1081  TFlt AttrVal = Table->AggregateVector<TFlt>(it.GetDat(), AggrPolicy);
1082  Graph->AddFltAttrDatN(NId, AttrVal, it.GetKey());
1083  }
1084  }
1085  if (NodeStrAttrs.IsKey(NId)) {
1086  TStrStrVH StrAttrVals = NodeStrAttrs.GetDat(NId);
1087  for (TStrStrVH::TIter it = StrAttrVals.BegI(); it < StrAttrVals.EndI(); it++) {
1088  TStr AttrVal = Table->AggregateVector<TStr>(it.GetDat(), AggrPolicy);
1089  Graph->AddStrAttrDatN(NId, AttrVal, it.GetKey());
1090  }
1091  }
1092  }
1093  }
1094 
1095 
1096  Graph->SetEdges(NumRows);
1098 
1099  // double endAdd = omp_get_wtime();
1100  // printf("Add time = %f\n", endAdd-endAlloc);
1101 
1102  return Graph;
1103 }
1104 
1106 template<class PGraphMP>
1107 PGraphMP ToNetworkMP(PTable Table,
1108  const TStr& SrcCol, const TStr& DstCol, TAttrAggr AggrPolicy)
1109 {
1110  TStrV V;
1111  return ToNetworkMP<PGraphMP>(Table, SrcCol, DstCol, V,AggrPolicy);
1112 }
1113 
1114 
1115 
1117 template<class PGraphMP>
1118 inline PGraphMP ToNetworkMP2(PTable Table,
1119  const TStr& SrcCol, const TStr& DstCol,
1120  TStrV& SrcAttrV, TStrV& DstAttrV, TStrV& EdgeAttrV,
1121  TAttrAggr AggrPolicy) {
1123 
1125  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
1126  const TInt DstColIdx = Table->GetColIdx(DstCol);
1127  const TInt NumRows = Table->NumValidRows;
1128 
1129  const TAttrType NodeType = Table->GetColType(SrcCol);
1130  Assert(NodeType == Table->GetColType(DstCol));
1131 
1132 
1133 
1134  TIntV SrcCol1, EdgeCol1, EdgeCol2, DstCol2;
1135 
1136  #pragma omp parallel sections num_threads(4)
1137  {
1138  #pragma omp section
1139  { SrcCol1.Reserve(NumRows, NumRows); }
1140  #pragma omp section
1141  { EdgeCol1.Reserve(NumRows, NumRows); }
1142  #pragma omp section
1143  { DstCol2.Reserve(NumRows, NumRows); }
1144  #pragma omp section
1145  { EdgeCol2.Reserve(NumRows, NumRows); }
1146  }
1149  TIntPrV Partitions;
1150 // int NThreads = omp_get_max_threads();
1151  const int NThreads = 40;
1152  Table->GetPartitionRanges(Partitions, NThreads);
1153  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
1154 
1155  // double endPartition = omp_get_wtime();
1156  // printf("Partition time = %f\n", endPartition-endResize);
1157 
1158  if (NodeType == atInt) {
1159  #pragma omp parallel for schedule(static)
1160  for (int i = 0; i < Partitions.Len(); i++) {
1161  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1162  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1163  while (RowI < EndI) {
1164  TInt RowId = RowI.GetRowIdx();
1165  SrcCol1[RowId] = RowI.GetIntAttr(SrcColIdx);
1166  EdgeCol1[RowId] = RowId;
1167  DstCol2[RowId] = RowI.GetIntAttr(DstColIdx);
1168  EdgeCol2[RowId] = RowId;
1169  RowI++;
1170  }
1171  }
1172  }
1173  else if (NodeType == atStr) {
1174  #pragma omp parallel for schedule(static)
1175  for (int i = 0; i < Partitions.Len(); i++) {
1176  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1177  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1178  while (RowI < EndI) {
1179  TInt RowId = RowI.GetRowIdx();
1180  SrcCol1[RowId] = RowI.GetStrMapById(SrcColIdx);
1181  EdgeCol1[RowId] = RowId;
1182  DstCol2[RowId] = RowI.GetStrMapById(DstColIdx);
1183  EdgeCol2[RowId] = RowId;
1184  RowI++;
1185  }
1186  }
1187 
1188  }
1189 
1190 // printf("NumRows = %d\n", NumRows.Val);
1191 // printf("NThreads = %d\n", NThreads);
1192 // for (int i = 0; i < Partitions.Len(); i++) {
1193 // printf("Partition %d %d->%d\n", i, Partitions[i].GetVal1().Val, Partitions[i].GetVal2().Val);
1194 // }
1195  int Parts[NThreads+1];
1196  for (int i = 0; i < NThreads; i++) {
1197  Parts[i] = NumRows.Val / NThreads * i;
1198  }
1199  Parts[NThreads] = NumRows;
1200 // for (int i = 0; i < NThreads+1; i++) {
1201 // printf("Parts[%d] = %d\n", i, Parts[i]);
1202 // }
1204 
1205  Sw->Start(TStopwatch::Sort);
1206  TInt ExtremePoints[4][NThreads];
1207  omp_set_num_threads(omp_get_max_threads());
1208  #pragma omp parallel
1209  {
1210  #pragma omp for schedule(static) nowait
1211  for (int i = 0; i < NThreads; i++) {
1212  TInt StartPos = Parts[i];
1213  TInt EndPos = Parts[i+1]-1;
1214  // TODO: Handle empty partition
1215  TTable::QSortKeyVal(SrcCol1, EdgeCol1, StartPos, EndPos);
1216  ExtremePoints[0][i] = SrcCol1[StartPos];
1217  ExtremePoints[2][i] = SrcCol1[EndPos];
1218  }
1219  #pragma omp for schedule(static) nowait
1220  for (int i = 0; i < NThreads; i++) {
1221  TInt StartPos = Parts[i];
1222  TInt EndPos = Parts[i+1]-1;
1223  // TODO: Handle empty partition
1224  TTable::QSortKeyVal(DstCol2, EdgeCol2, StartPos, EndPos);
1225  ExtremePoints[1][i] = DstCol2[StartPos];
1226  ExtremePoints[3][i] = DstCol2[EndPos];
1227  }
1228  }
1229 // for (int i = 0; i < NThreads; i++) {
1230 // printf("ExtremePoints[%d] = %d-%d -> %d-%d\n", i, ExtremePoints[0][i].Val, ExtremePoints[1][i].Val, ExtremePoints[2][i].Val, ExtremePoints[3][i].Val);
1231 // }
1232 
1233  // find min points
1234  TInt MinId(INT_MAX);
1235  for (int j = 0; j < 2; j++) {
1236  for (int i = 0; i < NThreads; i++) {
1237  if (MinId > ExtremePoints[j][i]) { MinId = ExtremePoints[j][i]; }
1238  }
1239  }
1240  TInt MaxId(-1);
1241  for (int j = 2; j < 4; j++) {
1242  for (int i = 0; i < NThreads; i++) {
1243  if (MaxId < ExtremePoints[j][i]) { MaxId = ExtremePoints[j][i]; }
1244  }
1245  }
1246 // printf("MinId = %d\n", MinId.Val);
1247 // printf("MaxId = %d\n", MaxId.Val);
1248  Sw->Stop(TStopwatch::Sort);
1249 
1250  Sw->Start(TStopwatch::Group);
1251 // const int NumCollectors = omp_get_max_threads();
1252  const int NumCollectors = 20;
1253  int Range = MaxId.Val - MinId.Val;
1254  TIntV IdRanges(NumCollectors+1);
1255  for (int j = 0; j < NumCollectors; j++) {
1256  IdRanges[j] = MinId + Range/NumCollectors*j;
1257  }
1258  IdRanges[NumCollectors] = MaxId+1;
1259 // for (int i = 0; i < NumCollectors+1; i++) {
1260 // printf("IdRanges[%d] = %d\n", i, IdRanges[i].Val);
1261 // }
1262 
1263  int SrcOffsets[NThreads][NumCollectors+1];
1264  #pragma omp parallel for schedule(static)
1265  for (int i = 0; i < NThreads; i++) {
1266  int CollectorId = 0;
1267  for (int j = Parts[i]; j < Parts[i+1]; j++) {
1268  while (SrcCol1[j] >= IdRanges[CollectorId]) {
1269  SrcOffsets[i][CollectorId++] = j;
1270  }
1271  }
1272  while (CollectorId <= NumCollectors) {
1273  SrcOffsets[i][CollectorId++] = Parts[i+1];
1274  }
1275  }
1276  int DstOffsets[NThreads][NumCollectors+1];
1277  #pragma omp parallel for schedule(static)
1278  for (int i = 0; i < NThreads; i++) {
1279  int CollectorId = 0;
1280  for (int j = Parts[i]; j < Parts[i+1]; j++) {
1281  while (DstCol2[j] >= IdRanges[CollectorId]) {
1282  DstOffsets[i][CollectorId++] = j;
1283  }
1284  }
1285  while (CollectorId <= NumCollectors) {
1286  DstOffsets[i][CollectorId++] = Parts[i+1];
1287  }
1288  }
1289 // for (int i = 0; i < NThreads; i++) {
1290 // for (int j = 0; j < NumCollectors+1; j++) {
1291 // printf("SrcOffsets[%d][%d] = %d\n", i, j, SrcOffsets[i][j]);
1292 // }
1293 // }
1294 // for (int i = 0; i < NThreads; i++) {
1295 // for (int j = 0; j < NumCollectors+1; j++) {
1296 // printf("DstOffsets[%d][%d] = %d\n", i, j, DstOffsets[i][j]);
1297 // }
1298 // }
1299 
1300  TIntV SrcCollectorOffsets(NumCollectors+1);
1301  SrcCollectorOffsets[0] = 0;
1302  for (int k = 0; k < NumCollectors; k++) {
1303  int SumOffset = 0;
1304  for (int i = 0; i < NThreads; i++) {
1305  SumOffset += SrcOffsets[i][k+1] - SrcOffsets[i][k];
1306  }
1307  SrcCollectorOffsets[k+1] = SrcCollectorOffsets[k] + SumOffset;
1308  }
1309  TIntV DstCollectorOffsets(NumCollectors+1);
1310  DstCollectorOffsets[0] = 0;
1311  for (int k = 0; k < NumCollectors; k++) {
1312  int SumOffset = 0;
1313  for (int i = 0; i < NThreads; i++) {
1314  SumOffset += DstOffsets[i][k+1] - DstOffsets[i][k];
1315  }
1316  DstCollectorOffsets[k+1] = DstCollectorOffsets[k] + SumOffset;
1317  }
1318 // for (int i = 0; i < NumCollectors+1; i++) {
1319 // printf("SrcCollectorOffsets[%d] = %d\n", i, SrcCollectorOffsets[i].Val);
1320 // }
1321 // for (int i = 0; i < NumCollectors+1; i++) {
1322 // printf("DstCollectorOffsets[%d] = %d\n", i, DstCollectorOffsets[i].Val);
1323 // }
1324 
1325  TIntV SrcCol3, EdgeCol3, EdgeCol4, DstCol4;
1326  #pragma omp parallel sections num_threads(4)
1327  {
1328  #pragma omp section
1329  { SrcCol3.Reserve(NumRows, NumRows); }
1330  #pragma omp section
1331  { EdgeCol3.Reserve(NumRows, NumRows); }
1332  #pragma omp section
1333  { DstCol4.Reserve(NumRows, NumRows); }
1334  #pragma omp section
1335  { EdgeCol4.Reserve(NumRows, NumRows); }
1336  }
1337 
1338  TIntV SrcNodeCounts(NumCollectors), DstNodeCounts(NumCollectors);
1339  #pragma omp parallel for schedule(static)
1340  for (int k = 0; k < NumCollectors; k++) {
1341  int ind = SrcCollectorOffsets[k];
1342  for (int i = 0; i < NThreads; i++) {
1343  for (int j = SrcOffsets[i][k]; j < SrcOffsets[i][k+1]; j++) {
1344  SrcCol3[ind] = SrcCol1[j];
1345  EdgeCol3[ind] = EdgeCol1[j];
1346  ind++;
1347  }
1348  }
1349  TTable::QSortKeyVal(SrcCol3, EdgeCol3, SrcCollectorOffsets[k], SrcCollectorOffsets[k+1]-1);
1350  int SrcCount = 0;
1351  if (SrcCollectorOffsets[k+1] > SrcCollectorOffsets[k]) {
1352  SrcCount = 1;
1353  for (int j = SrcCollectorOffsets[k]+1; j < SrcCollectorOffsets[k+1]; j++) {
1354  if (SrcCol3[j] != SrcCol3[j-1]) { SrcCount++; }
1355  }
1356  }
1357  SrcNodeCounts[k] = SrcCount;
1358 
1359  ind = DstCollectorOffsets[k];
1360  for (int i = 0; i < NThreads; i++) {
1361  for (int j = DstOffsets[i][k]; j < DstOffsets[i][k+1]; j++) {
1362  DstCol4[ind] = DstCol2[j];
1363  EdgeCol4[ind] = EdgeCol2[j];
1364  ind++;
1365  }
1366  }
1367  TTable::QSortKeyVal(DstCol4, EdgeCol4, DstCollectorOffsets[k], DstCollectorOffsets[k+1]-1);
1368  int DstCount = 0;
1369  if (DstCollectorOffsets[k+1] > DstCollectorOffsets[k]) {
1370  DstCount = 1;
1371  for (int j = DstCollectorOffsets[k]+1; j < DstCollectorOffsets[k+1]; j++) {
1372  if (DstCol4[j] != DstCol4[j-1]) { DstCount++; }
1373  }
1374  }
1375  DstNodeCounts[k] = DstCount;
1376  }
1377 
1378  TInt TotalSrcNodes = 0;
1379  TIntV SrcIdOffsets;
1380  for (int i = 0; i < NumCollectors; i++) {
1381  SrcIdOffsets.Add(TotalSrcNodes);
1382  TotalSrcNodes += SrcNodeCounts[i];
1383  }
1384 
1385 // printf("Sorted = %d - %d\n", SrcCol3.IsSorted(), DstCol4.IsSorted());
1386 // for (int i = 0; i < NumRows-1; i++) {
1387 // if (SrcCol3[i] > SrcCol3[i+1]) { printf("i=%d: %d %d\n", i, SrcCol3[i].Val, SrcCol3[i+1].Val); }
1388 // }
1389 // for (int i = 0; i < NumRows-1; i++) {
1390 // if (DstCol4[i] > DstCol4[i+1]) { printf("i=%d: %d %d\n", i, DstCol4[i].Val, DstCol4[i+1].Val); }
1391 // }
1392 
1393  TInt TotalDstNodes = 0;
1394  TIntV DstIdOffsets;
1395  for (int i = 0; i < NumCollectors; i++) {
1396  DstIdOffsets.Add(TotalDstNodes);
1397  TotalDstNodes += DstNodeCounts[i];
1398  }
1399 
1400  // find vector of (node_id, start_offset) where start_offset is the index of the first row with node_id
1401  TIntPrV SrcNodeIds, DstNodeIds;
1402  #pragma omp parallel sections
1403  {
1404  #pragma omp section
1405  { SrcNodeIds.Reserve(TotalSrcNodes, TotalSrcNodes); }
1406  #pragma omp section
1407  { DstNodeIds.Reserve(TotalDstNodes, TotalDstNodes); }
1408  }
1409 
1410  // Find the starting offset of each node (in both src and dst)
1411  #pragma omp parallel for schedule(dynamic)
1412  for (int t = 0; t < 2*NumCollectors; t++) {
1413  if (t < NumCollectors) {
1414  TInt i = t;
1415  if (SrcCollectorOffsets[i] < SrcCollectorOffsets[i+1]) {
1416  TInt CurrNode = SrcCol3[SrcCollectorOffsets[i]];
1417  TInt ThreadOffset = SrcIdOffsets[i];
1418  SrcNodeIds[ThreadOffset] = TIntPr(CurrNode, SrcCollectorOffsets[i]);
1419  TInt CurrCount = 1;
1420  for (TInt j = SrcCollectorOffsets[i]+1; j < SrcCollectorOffsets[i+1]; j++) {
1421  while (j < SrcCollectorOffsets[i+1] && SrcCol3[j] == CurrNode) { j++; }
1422  if (j < SrcCollectorOffsets[i+1]) {
1423  CurrNode = SrcCol3[j];
1424  SrcNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
1425  CurrCount++;
1426  }
1427  }
1428  }
1429  } else {
1430  TInt i = t - NumCollectors;
1431  if (DstCollectorOffsets[i] < DstCollectorOffsets[i+1]) {
1432  TInt CurrNode = DstCol4[DstCollectorOffsets[i]];
1433  TInt ThreadOffset = DstIdOffsets[i];
1434  DstNodeIds[ThreadOffset] = TIntPr(CurrNode, DstCollectorOffsets[i]);
1435  TInt CurrCount = 1;
1436  for (TInt j = DstCollectorOffsets[i]+1; j < DstCollectorOffsets[i+1]; j++) {
1437  while (j < DstCollectorOffsets[i+1] && DstCol4[j] == CurrNode) { j++; }
1438  if (j < DstCollectorOffsets[i+1]) {
1439  CurrNode = DstCol4[j];
1440  DstNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
1441  CurrCount++;
1442  }
1443  }
1444  }
1445  }
1446  }
1447  Sw->Stop(TStopwatch::Group);
1448 
1450  // Find the combined neighborhood (both out-neighbors and in-neighbors) of each node
1451  TIntTrV Nodes;
1452  Nodes.Reserve(TotalSrcNodes+TotalDstNodes);
1453 
1454  TInt i = 0, j = 0;
1455  while (i < TotalSrcNodes && j < TotalDstNodes) {
1456  if (SrcNodeIds[i].Val1 == DstNodeIds[j].Val1) {
1457  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, j));
1458  i++;
1459  j++;
1460  } else if (SrcNodeIds[i].Val1 < DstNodeIds[j].Val1) {
1461  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1));
1462  i++;
1463  } else {
1464  Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j));
1465  j++;
1466  }
1467  }
1468  for (; i < TotalSrcNodes; i++) { Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1)); }
1469  for (; j < TotalDstNodes; j++) { Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j)); }
1471 
1473  TInt NumNodes = Nodes.Len();
1474  PGraphMP Graph = PGraphMP::TObj::New(NumNodes, NumRows);
1475 // NumThreads = omp_get_max_threads();
1476 // int Delta = (NumNodes+NumThreads-1)/NumThreads;
1477 
1478  TVec<TIntV> InVV(NumNodes);
1479  TVec<TIntV> OutVV(NumNodes);
1480 
1481 // omp_set_num_threads(NumThreads);
1482  #pragma omp parallel for schedule(static,100)
1483  for (int m = 0; m < NumNodes; m++) {
1484  //double startTr = omp_get_wtime();
1485  //TIntV OutV, InV;
1486  TInt n, i, j;
1487  Nodes[m].GetVal(n, i, j);
1488  if (i >= 0) {
1489  TInt Offset = SrcNodeIds[i].GetVal2();
1490  TInt Sz = EdgeCol3.Len()-Offset;
1491  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
1492  OutVV[m].Reserve(Sz);
1493  OutVV[m].CopyUniqueFrom(EdgeCol3, Offset, Sz);
1494  }
1495  if (j >= 0) {
1496  TInt Offset = DstNodeIds[j].GetVal2();
1497  TInt Sz = EdgeCol4.Len()-Offset;
1498  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
1499  InVV[m].Reserve(Sz);
1500  InVV[m].CopyUniqueFrom(EdgeCol4, Offset, Sz);
1501  }
1502  Graph->AddNodeWithEdges(n, InVV[m], OutVV[m]);
1503  }
1504  Graph->SetNodes(NumNodes);
1506 
1508  omp_set_num_threads(omp_get_max_threads());
1509  if (NodeType == atInt) {
1510  #pragma omp parallel for schedule(static)
1511  for (int i = 0; i < Partitions.Len(); i++) {
1512  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1513  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1514  while (RowI < EndI) {
1515  TInt RowId = RowI.GetRowIdx(); // EdgeId
1516  TInt SrcId = RowI.GetIntAttr(SrcColIdx);
1517  TInt DstId = RowI.GetIntAttr(DstColIdx);
1518  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1519  RowI++;
1520  }
1521  }
1522  }
1523  else if (NodeType == atStr) {
1524  #pragma omp parallel for schedule(static)
1525  for (int i = 0; i < Partitions.Len(); i++) {
1526  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1527  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1528  while (RowI < EndI) {
1529  TInt RowId = RowI.GetRowIdx(); // EdgeId
1530  TInt SrcId = RowI.GetStrMapById(SrcColIdx);
1531  TInt DstId = RowI.GetStrMapById(DstColIdx);
1532  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1533  RowI++;
1534  }
1535  }
1536  }
1537  Graph->SetEdges(NumRows);
1539 
1540  // double endAdd = omp_get_wtime();
1541  // printf("Add time = %f\n", endAdd-endAlloc);
1542 
1543  return Graph;
1544 }
1546 template<class PGraphMP>
1547 PGraphMP ToNetworkMP2(PTable Table,
1548  const TStr& SrcCol, const TStr& DstCol, TAttrAggr AggrPolicy)
1549 {
1550  TStrV V;
1551  return ToNetworkMP2<PGraphMP>(Table, SrcCol, DstCol, V, V, V, AggrPolicy);
1552 }
1553 #endif // GCC_ATOMIC
1554 
1555 
1557 int LoadModeNetToNet(PMMNet Graph, const TStr& Name, PTable Table, const TStr& NCol,
1558  TStrV& NodeAttrV);
1560 int LoadMode(TModeNet& Graph, PTable Table, const TStr& NCol,
1561  TStrV& NodeAttrV);
1563 int LoadCrossNetToNet(PMMNet Graph, const TStr& Mode1, const TStr& Mode2, const TStr& CrossName,
1564  PTable Table, const TStr& SrcCol, const TStr& DstCol, TStrV& EdgeAttrV);
1566 int LoadCrossNet(TCrossNet& Graph, PTable Table, const TStr& SrcCol, const TStr& DstCol,
1567  TStrV& EdgeAttrV);
1568 
1569 
1571 template<class PGraph>
1572 PGraph ToNetwork(PTable Table,
1573  const TStr& SrcCol, const TStr& DstCol,
1574  TStrV& EdgeAttrV,
1575  TAttrAggr AggrPolicy) {
1576  PGraph Graph = PGraph::TObj::New();
1577 
1578  const TAttrType NodeType = Table->GetColType(SrcCol);
1579  Assert(NodeType == Table->GetColType(DstCol));
1580  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
1581  const TInt DstColIdx = Table->GetColIdx(DstCol);
1582 
1583  //Table->AddGraphAttributeV(SrcAttrV, false, true, false);
1584  //Table->AddGraphAttributeV(DstAttrV, false, false, true);
1585  //Table->AddGraphAttributeV(EdgeAttrV, true, false, true);
1586 
1587  // node values - i.e. the unique values of src/dst col
1588  //THashSet<TInt> IntNodeVals; // for both int and string node attr types.
1589  THash<TFlt, TInt> FltNodeVals;
1590 
1591  // make single pass over all rows in the table
1592  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
1593  if ((Table->Next)[CurrRowIdx] == Table->Invalid) {
1594  continue;
1595  }
1596 
1597  // add src and dst nodes to graph if they are not seen earlier
1598  TInt SVal, DVal;
1599  if (NodeType == atFlt) {
1600  TFlt FSVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
1601  SVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FSVal);
1602  TFlt FDVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
1603  DVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FDVal);
1604  }
1605  else if (NodeType == atInt || NodeType == atStr) {
1606  if (NodeType == atInt) {
1607  SVal = (Table->IntCols)[SrcColIdx][CurrRowIdx];
1608  DVal = (Table->IntCols)[DstColIdx][CurrRowIdx];
1609  }
1610  else {
1611  SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
1612  // if (strlen(Table->GetContextKey(SVal)) == 0) { continue; } //illegal value
1613  DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
1614  // if (strlen(Table->GetContextKey(DVal)) == 0) { continue; } //illegal value
1615  }
1616  if (!Graph->IsNode(SVal)) {Graph->AddNode(SVal); }
1617  if (!Graph->IsNode(DVal)) {Graph->AddNode(DVal); }
1618  //CheckAndAddIntNode(Graph, IntNodeVals, SVal);
1619  //CheckAndAddIntNode(Graph, IntNodeVals, DVal);
1620  }
1621 
1622  // add edge and edge attributes
1623  Graph->AddEdge(SVal, DVal, CurrRowIdx);
1624 
1625  // Aggregate edge attributes and add to graph
1626  for (TInt i = 0; i < EdgeAttrV.Len(); i++) {
1627  TStr ColName = EdgeAttrV[i];
1628  TAttrType T = Table->GetColType(ColName);
1629  TInt Index = Table->GetColIdx(ColName);
1630  switch (T) {
1631  case atInt:
1632  Graph->AddIntAttrDatE(CurrRowIdx, Table->IntCols[Index][CurrRowIdx], ColName);
1633  break;
1634  case atFlt:
1635  Graph->AddFltAttrDatE(CurrRowIdx, Table->FltCols[Index][CurrRowIdx], ColName);
1636  break;
1637  case atStr:
1638  Graph->AddStrAttrDatE(CurrRowIdx, Table->GetStrVal(Index, CurrRowIdx), ColName);
1639  break;
1640  }
1641  }
1642  }
1643  return Graph;
1644 
1645 }
1646 
1647 #ifdef GCC_ATOMIC
1648 
1650 template<class PGraphMP>
1651 inline PGraphMP ToNetworkMP(PTable Table,
1652  const TStr& SrcCol, const TStr& DstCol,
1653  TStrV& EdgeAttrV,
1654  TAttrAggr AggrPolicy) {
1656 
1658  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
1659  const TInt DstColIdx = Table->GetColIdx(DstCol);
1660  const TInt NumRows = Table->GetNumValidRows();
1661 
1662  const TAttrType NodeType = Table->GetColType(SrcCol);
1663  Assert(NodeType == Table->GetColType(DstCol));
1664 
1665  TIntV SrcCol1, EdgeCol1, EdgeCol2, DstCol2;
1666 
1667  THash<TInt, TStrIntVH> NodeIntAttrs;
1668  THash<TInt, TStrFltVH> NodeFltAttrs;
1669  THash<TInt, TStrStrVH> NodeStrAttrs;
1670 
1671  #pragma omp parallel sections num_threads(4)
1672  {
1673  #pragma omp section
1674  { SrcCol1.Reserve(NumRows, NumRows); }
1675  #pragma omp section
1676  { EdgeCol1.Reserve(NumRows, NumRows); }
1677  #pragma omp section
1678  { DstCol2.Reserve(NumRows, NumRows); }
1679  #pragma omp section
1680  { EdgeCol2.Reserve(NumRows, NumRows); }
1681  }
1683 
1685  TIntPrV Partitions;
1686  Table->GetPartitionRanges(Partitions, omp_get_max_threads());
1687  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
1688 
1689 
1690  // double endPartition = omp_get_wtime();
1691  // printf("Partition time = %f\n", endPartition-endResize);
1692 
1693  omp_set_num_threads(omp_get_max_threads());
1694  if (NodeType == atInt) {
1695  #pragma omp parallel for schedule(static)
1696  for (int i = 0; i < Partitions.Len(); i++) {
1697  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1698  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1699  while (RowI < EndI) {
1700  TInt RowId = RowI.GetRowIdx();
1701  SrcCol1[RowId] = RowI.GetIntAttr(SrcColIdx);
1702  EdgeCol1[RowId] = RowId;
1703  DstCol2[RowId] = RowI.GetIntAttr(DstColIdx);
1704  EdgeCol2[RowId] = RowId;
1705  RowI++;
1706  }
1707  }
1708  }
1709  else if (NodeType == atStr) {
1710  #pragma omp parallel for schedule(static)
1711  for (int i = 0; i < Partitions.Len(); i++) {
1712  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1713  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1714  while (RowI < EndI) {
1715  TInt RowId = RowI.GetRowIdx();
1716  SrcCol1[RowId] = RowI.GetStrMapById(SrcColIdx);
1717  EdgeCol1[RowId] = RowId;
1718  DstCol2[RowId] = RowI.GetStrMapById(DstColIdx);
1719  EdgeCol2[RowId] = RowId;
1720  RowI++;
1721  }
1722  }
1723  }
1725 
1726  Sw->Start(TStopwatch::Sort);
1727  omp_set_num_threads(omp_get_max_threads());
1728  #pragma omp parallel
1729  {
1730  #pragma omp single nowait
1731  {
1732  #ifndef GLib_WIN32
1733  #pragma omp task untied shared(SrcCol1, EdgeCol1)
1734  #endif
1735  { TTable::QSortKeyVal(SrcCol1, EdgeCol1, 0, NumRows-1); }
1736  }
1737  #pragma omp single nowait
1738  {
1739  #ifndef GLib_WIN32
1740  #pragma omp task untied shared(EdgeCol2, DstCol2)
1741  #endif
1742  { TTable::QSortKeyVal(DstCol2, EdgeCol2, 0, NumRows-1); }
1743  }
1744  #ifndef GLib_WIN32
1745  #pragma omp taskwait
1746  #endif
1747  }
1748  Sw->Stop(TStopwatch::Sort);
1749 
1750  Sw->Start(TStopwatch::Group);
1751  TInt NumThreads = omp_get_max_threads();
1752  TInt PartSize = (NumRows/NumThreads);
1753 
1754  // Find the offset of all partitions, each of which contains a list of rows.
1755  // Nodes from same sources or destinations are ensured to be kept within same partition.
1756  TIntV SrcOffsets, DstOffsets;
1757  SrcOffsets.Add(0);
1758  for (TInt i = 1; i < NumThreads; i++) {
1759  TInt CurrOffset = i * PartSize;
1760  while (CurrOffset < (i+1) * PartSize &&
1761  SrcCol1[CurrOffset-1] == SrcCol1[CurrOffset]) {
1762  // ensure that rows from the same sources are grouped together
1763  CurrOffset++;
1764  }
1765  if (CurrOffset < (i+1) * PartSize) { SrcOffsets.Add(CurrOffset); }
1766  }
1767  SrcOffsets.Add(NumRows);
1768 
1769  DstOffsets.Add(0);
1770  for (TInt i = 1; i < NumThreads; i++) {
1771  TInt CurrOffset = i * PartSize;
1772  while (CurrOffset < (i+1) * PartSize &&
1773  DstCol2[CurrOffset-1] == DstCol2[CurrOffset]) {
1774  // ensure that rows to the same destinations are grouped together
1775  CurrOffset++;
1776  }
1777  if (CurrOffset < (i+1) * PartSize) { DstOffsets.Add(CurrOffset); }
1778  }
1779  DstOffsets.Add(NumRows);
1780 
1781  TInt SrcPartCnt = SrcOffsets.Len()-1; // number of partitions
1782  TInt DstPartCnt = DstOffsets.Len()-1; // number of partitions
1783 
1784  // count the number of source nodes and destination nodes in each partition
1785  TIntV SrcNodeCounts, DstNodeCounts;
1786  SrcNodeCounts.Reserve(SrcPartCnt, SrcPartCnt);
1787  DstNodeCounts.Reserve(DstPartCnt, DstPartCnt);
1788 
1789  #pragma omp parallel for schedule(dynamic)
1790  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
1791  if (t < SrcPartCnt) {
1792  TInt i = t;
1793  if (SrcOffsets[i] != SrcOffsets[i+1]) {
1794  SrcNodeCounts[i] = 1;
1795  TInt CurrNode = SrcCol1[SrcOffsets[i]];
1796  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
1797  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
1798  if (j < SrcOffsets[i+1]) {
1799  SrcNodeCounts[i]++;
1800  CurrNode = SrcCol1[j];
1801  }
1802  }
1803  }
1804  } else {
1805  TInt i = t - SrcPartCnt;
1806  if (DstOffsets[i] != DstOffsets[i+1]) {
1807  DstNodeCounts[i] = 1;
1808  TInt CurrNode = DstCol2[DstOffsets[i]];
1809  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
1810  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
1811  if (j < DstOffsets[i+1]) {
1812  DstNodeCounts[i]++;
1813  CurrNode = DstCol2[j];
1814  }
1815  }
1816  }
1817  }
1818  }
1819 
1820  TInt TotalSrcNodes = 0;
1821  TIntV SrcIdOffsets;
1822  for (int i = 0; i < SrcPartCnt; i++) {
1823  SrcIdOffsets.Add(TotalSrcNodes);
1824  TotalSrcNodes += SrcNodeCounts[i];
1825  }
1826 
1827  TInt TotalDstNodes = 0;
1828  TIntV DstIdOffsets;
1829  for (int i = 0; i < DstPartCnt; i++) {
1830  DstIdOffsets.Add(TotalDstNodes);
1831  TotalDstNodes += DstNodeCounts[i];
1832  }
1833 
1834  // printf("Total Src = %d, Total Dst = %d\n", TotalSrcNodes.Val, TotalDstNodes.Val);
1835 
1836  // find vector of (node_id, start_offset) where start_offset is the index of the first row with node_id
1837  TIntPrV SrcNodeIds, DstNodeIds;
1838  #pragma omp parallel sections
1839  {
1840  #pragma omp section
1841  { SrcNodeIds.Reserve(TotalSrcNodes, TotalSrcNodes); }
1842  #pragma omp section
1843  { DstNodeIds.Reserve(TotalDstNodes, TotalDstNodes); }
1844  }
1845 
1846  // Find the starting offset of each node (in both src and dst)
1847  #pragma omp parallel for schedule(dynamic)
1848  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
1849  if (t < SrcPartCnt) {
1850  TInt i = t;
1851  if (SrcOffsets[i] != SrcOffsets[i+1]) {
1852  TInt CurrNode = SrcCol1[SrcOffsets[i]];
1853  TInt ThreadOffset = SrcIdOffsets[i];
1854  SrcNodeIds[ThreadOffset] = TIntPr(CurrNode, SrcOffsets[i]);
1855  TInt CurrCount = 1;
1856  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
1857  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
1858  if (j < SrcOffsets[i+1]) {
1859  CurrNode = SrcCol1[j];
1860  SrcNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
1861  CurrCount++;
1862  }
1863  }
1864  }
1865  } else {
1866  TInt i = t - SrcPartCnt;
1867  if (DstOffsets[i] != DstOffsets[i+1]) {
1868  TInt CurrNode = DstCol2[DstOffsets[i]];
1869  TInt ThreadOffset = DstIdOffsets[i];
1870  DstNodeIds[ThreadOffset] = TIntPr(CurrNode, DstOffsets[i]);
1871  TInt CurrCount = 1;
1872  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
1873  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
1874  if (j < DstOffsets[i+1]) {
1875  CurrNode = DstCol2[j];
1876  DstNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
1877  CurrCount++;
1878  }
1879  }
1880  }
1881  }
1882  }
1883  Sw->Stop(TStopwatch::Group);
1884 
1886  // Find the combined neighborhood (both out-neighbors and in-neighbors) of each node
1887  TIntTrV Nodes;
1888  Nodes.Reserve(TotalSrcNodes+TotalDstNodes);
1889 
1890  TInt i = 0, j = 0;
1891  while (i < TotalSrcNodes && j < TotalDstNodes) {
1892  if (SrcNodeIds[i].Val1 == DstNodeIds[j].Val1) {
1893  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, j));
1894  i++;
1895  j++;
1896  } else if (SrcNodeIds[i].Val1 < DstNodeIds[j].Val1) {
1897  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1));
1898  i++;
1899  } else {
1900  Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j));
1901  j++;
1902  }
1903  }
1904  for (; i < TotalSrcNodes; i++) { Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1)); }
1905  for (; j < TotalDstNodes; j++) { Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j)); }
1907 
1909  TInt NumNodes = Nodes.Len();
1910  PGraphMP Graph = PGraphMP::TObj::New(NumNodes, NumRows);
1911 // NumThreads = omp_get_max_threads();
1912 // int Delta = (NumNodes+NumThreads-1)/NumThreads;
1913 
1914  TVec<TIntV> InVV(NumNodes);
1915  TVec<TIntV> OutVV(NumNodes);
1916 
1917 // omp_set_num_threads(NumThreads);
1918  #pragma omp parallel for schedule(static,100)
1919  for (int m = 0; m < NumNodes; m++) {
1920  //double startTr = omp_get_wtime();
1921  //TIntV OutV, InV;
1922  TInt n, i, j;
1923  Nodes[m].GetVal(n, i, j);
1924  if (i >= 0) {
1925  TInt Offset = SrcNodeIds[i].GetVal2();
1926  TInt Sz = EdgeCol1.Len()-Offset;
1927  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
1928  OutVV[m].Reserve(Sz);
1929  OutVV[m].CopyUniqueFrom(EdgeCol1, Offset, Sz);
1930  }
1931  if (j >= 0) {
1932  TInt Offset = DstNodeIds[j].GetVal2();
1933  TInt Sz = EdgeCol2.Len()-Offset;
1934  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
1935  InVV[m].Reserve(Sz);
1936  InVV[m].CopyUniqueFrom(EdgeCol2, Offset, Sz);
1937  }
1938  Graph->AddNodeWithEdges(n, InVV[m], OutVV[m]);
1939  }
1940  Graph->SetNodes(NumNodes);
1942 
1944  omp_set_num_threads(omp_get_max_threads());
1945  if (NodeType == atInt) {
1946  #pragma omp parallel for schedule(static)
1947  for (int i = 0; i < Partitions.Len(); i++) {
1948  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1949  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1950  while (RowI < EndI) {
1951  TInt RowId = RowI.GetRowIdx(); // EdgeId
1952  TInt SrcId = RowI.GetIntAttr(SrcColIdx);
1953  TInt DstId = RowI.GetIntAttr(DstColIdx);
1954  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1955  RowI++;
1956  }
1957  }
1958  }
1959  else if (NodeType == atStr) {
1960  #pragma omp parallel for schedule(static)
1961  for (int i = 0; i < Partitions.Len(); i++) {
1962  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1963  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1964  while (RowI < EndI) {
1965  TInt RowId = RowI.GetRowIdx(); // EdgeId
1966  TInt SrcId = RowI.GetStrMapById(SrcColIdx);
1967  TInt DstId = RowI.GetStrMapById(DstColIdx);
1968  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1969  RowI++;
1970  }
1971  }
1972 
1973  }
1974 
1975  Graph->SetEdges(NumRows);
1976  Graph->SetMxEId(NumRows);
1978 
1979  // make single pass over all rows in the table to add attributes
1980  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
1981  if ((Table->Next)[CurrRowIdx] == Table->Invalid) {
1982  continue;
1983  }
1984  for (TInt ea_i = 0; ea_i < EdgeAttrV.Len(); ea_i++) {
1985  TStr ColName = EdgeAttrV[ea_i];
1986  TAttrType T = Table->GetColType(ColName);
1987  TInt Index = Table->GetColIdx(ColName);
1988  switch (T) {
1989  case atInt:
1990  Graph->AddIntAttrDatE(CurrRowIdx, Table->IntCols[Index][CurrRowIdx], ColName);
1991  break;
1992  case atFlt:
1993  Graph->AddFltAttrDatE(CurrRowIdx, Table->FltCols[Index][CurrRowIdx], ColName);
1994  break;
1995  case atStr:
1996  Graph->AddStrAttrDatE(CurrRowIdx, Table->GetStrVal(Index, CurrRowIdx), ColName);
1997  break;
1998  }
1999  }
2000  }
2001  // double endAdd = omp_get_wtime();
2002  // printf("Add time = %f\n", endAdd-endAlloc);
2003 
2004  return Graph;
2005 }
2006 #endif // GCC_ATOMIC
2007 
2009 template<class PGraph>
2010 PGraph ToNetwork(PTable Table,
2011  const TStr& SrcCol, const TStr& DstCol,
2012  TStrV& EdgeAttrV, PTable NodeTable, const TStr& NodeCol, TStrV& NodeAttrV,
2013  TAttrAggr AggrPolicy) {
2014  PGraph Graph = PGraph::TObj::New();
2015 
2016  const TAttrType NodeType = Table->GetColType(SrcCol);
2017  Assert(NodeType == Table->GetColType(DstCol));
2018  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
2019  const TInt DstColIdx = Table->GetColIdx(DstCol);
2020 
2021 
2022  const TAttrType NodeTypeN = NodeTable->GetColType(NodeCol);
2023  const TInt NodeColIdx = NodeTable->GetColIdx(NodeCol);
2024  THash<TInt, TStrIntVH> NodeIntAttrs;
2025  THash<TInt, TStrFltVH> NodeFltAttrs;
2026  THash<TInt, TStrStrVH> NodeStrAttrs;
2027 
2028 
2029  //Table->AddGraphAttributeV(SrcAttrV, false, true, false);
2030  //Table->AddGraphAttributeV(DstAttrV, false, false, true);
2031  //Table->AddGraphAttributeV(EdgeAttrV, true, false, true);
2032 
2033  // node values - i.e. the unique values of src/dst col
2034  //THashSet<TInt> IntNodeVals; // for both int and string node attr types.
2035  THash<TFlt, TInt> FltNodeVals;
2036 
2037  // make single pass over all rows in the table
2038  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
2039  if ((Table->Next)[CurrRowIdx] == Table->Invalid) {
2040  continue;
2041  }
2042 
2043  // add src and dst nodes to graph if they are not seen earlier
2044  TInt SVal, DVal;
2045  if (NodeType == atFlt) {
2046  TFlt FSVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
2047  SVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FSVal);
2048  TFlt FDVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
2049  DVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FDVal);
2050  }
2051  else if (NodeType == atInt || NodeType == atStr) {
2052  if (NodeType == atInt) {
2053  SVal = (Table->IntCols)[SrcColIdx][CurrRowIdx];
2054  DVal = (Table->IntCols)[DstColIdx][CurrRowIdx];
2055  }
2056  else {
2057  SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
2058  // if (strlen(Table->GetContextKey(SVal)) == 0) { continue; } //illegal value
2059  DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
2060  // if (strlen(Table->GetContextKey(DVal)) == 0) { continue; } //illegal value
2061  }
2062  if (!Graph->IsNode(SVal)) {Graph->AddNode(SVal); }
2063  if (!Graph->IsNode(DVal)) {Graph->AddNode(DVal); }
2064  //CheckAndAddIntNode(Graph, IntNodeVals, SVal);
2065  //CheckAndAddIntNode(Graph, IntNodeVals, DVal);
2066  }
2067 
2068  // add edge and edge attributes
2069  Graph->AddEdge(SVal, DVal, CurrRowIdx);
2070 
2071  // Aggregate edge attributes and add to graph
2072  for (TInt i = 0; i < EdgeAttrV.Len(); i++) {
2073  TStr ColName = EdgeAttrV[i];
2074  TAttrType T = Table->GetColType(ColName);
2075  TInt Index = Table->GetColIdx(ColName);
2076  switch (T) {
2077  case atInt:
2078  Graph->AddIntAttrDatE(CurrRowIdx, Table->IntCols[Index][CurrRowIdx], ColName);
2079  break;
2080  case atFlt:
2081  Graph->AddFltAttrDatE(CurrRowIdx, Table->FltCols[Index][CurrRowIdx], ColName);
2082  break;
2083  case atStr:
2084  Graph->AddStrAttrDatE(CurrRowIdx, Table->GetStrVal(Index, CurrRowIdx), ColName);
2085  break;
2086  }
2087  }
2088  }
2089 
2090 
2091  //Add node attribtes
2092  if (NodeAttrV.Len() > 0) {
2093  for (int CurrRowIdx = 0; CurrRowIdx < (NodeTable->Next).Len(); CurrRowIdx++) {
2094  if ((NodeTable->Next)[CurrRowIdx] == NodeTable->Invalid) {
2095  continue;
2096  }
2097  TInt NId;
2098  if (NodeTypeN == atInt) {
2099  NId = (NodeTable->IntCols)[NodeColIdx][CurrRowIdx];
2100  }
2101  else if (NodeTypeN == atStr){
2102  NId = (NodeTable->StrColMaps)[NodeColIdx][CurrRowIdx];
2103  }
2104  for (TInt i = 0; i < NodeAttrV.Len(); i++) {
2105  TStr ColName = NodeAttrV[i];
2106  TAttrType T = NodeTable->GetColType(ColName);
2107  TInt Index = NodeTable->GetColIdx(ColName);
2108  switch (T) {
2109  case atInt:
2110  Graph->AddIntAttrDatN(NId, NodeTable->IntCols[Index][CurrRowIdx], ColName);
2111  break;
2112  case atFlt:
2113  Graph->AddFltAttrDatN(NId, NodeTable->FltCols[Index][CurrRowIdx], ColName);
2114  break;
2115  case atStr:
2116  Graph->AddStrAttrDatN(NId, NodeTable->GetStrVal(Index, CurrRowIdx), ColName);
2117  break;
2118  }
2119  }
2120  }
2121  }
2122 
2123  return Graph;
2124 
2125 }
2126 
2127 #ifdef GCC_ATOMIC
2128 
2130 template<class PGraphMP>
2131 inline PGraphMP ToNetworkMP(PTable Table,
2132  const TStr& SrcCol, const TStr& DstCol,
2133  TStrV& EdgeAttrV, PTable NodeTable, const TStr& NodeCol, TStrV& NodeAttrV,
2134  TAttrAggr AggrPolicy) {
2136 
2138  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
2139  const TInt DstColIdx = Table->GetColIdx(DstCol);
2140  const TInt NumRows = Table->GetNumValidRows();
2141 
2142  const TAttrType NodeType = Table->GetColType(SrcCol);
2143  Assert(NodeType == Table->GetColType(DstCol));
2144 
2145 
2146  TIntV SrcCol1, EdgeCol1, EdgeCol2, DstCol2;
2147 
2148  const TAttrType NodeTypeN = NodeTable->GetColType(NodeCol);
2149  const TInt NodeColIdx = NodeTable->GetColIdx(NodeCol);
2150  THash<TInt, TStrIntVH> NodeIntAttrs;
2151  THash<TInt, TStrFltVH> NodeFltAttrs;
2152  THash<TInt, TStrStrVH> NodeStrAttrs;
2153 
2154  #pragma omp parallel sections num_threads(4)
2155  {
2156  #pragma omp section
2157  { SrcCol1.Reserve(NumRows, NumRows); }
2158  #pragma omp section
2159  { EdgeCol1.Reserve(NumRows, NumRows); }
2160  #pragma omp section
2161  { DstCol2.Reserve(NumRows, NumRows); }
2162  #pragma omp section
2163  { EdgeCol2.Reserve(NumRows, NumRows); }
2164  }
2166 
2168  TIntPrV Partitions;
2169  Table->GetPartitionRanges(Partitions, omp_get_max_threads());
2170  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
2171 
2172  // double endPartition = omp_get_wtime();
2173  // printf("Partition time = %f\n", endPartition-endResize);
2174 
2175  omp_set_num_threads(omp_get_max_threads());
2176  if (NodeType == atInt) {
2177  #pragma omp parallel for schedule(static)
2178  for (int i = 0; i < Partitions.Len(); i++) {
2179  TRowIterator RowI(Partitions[i].GetVal1(), Table());
2180  TRowIterator EndI(Partitions[i].GetVal2(), Table());
2181  while (RowI < EndI) {
2182  TInt RowId = RowI.GetRowIdx();
2183  SrcCol1[RowId] = RowI.GetIntAttr(SrcColIdx);
2184  EdgeCol1[RowId] = RowId;
2185  DstCol2[RowId] = RowI.GetIntAttr(DstColIdx);
2186  EdgeCol2[RowId] = RowId;
2187  RowI++;
2188  }
2189  }
2190  }
2191  else if (NodeType == atStr) {
2192  #pragma omp parallel for schedule(static)
2193  for (int i = 0; i < Partitions.Len(); i++) {
2194  TRowIterator RowI(Partitions[i].GetVal1(), Table());
2195  TRowIterator EndI(Partitions[i].GetVal2(), Table());
2196  while (RowI < EndI) {
2197  TInt RowId = RowI.GetRowIdx();
2198  SrcCol1[RowId] = RowI.GetStrMapById(SrcColIdx);
2199  EdgeCol1[RowId] = RowId;
2200  DstCol2[RowId] = RowI.GetStrMapById(DstColIdx);
2201  EdgeCol2[RowId] = RowId;
2202  RowI++;
2203  }
2204  }
2205  }
2207 
2208  Sw->Start(TStopwatch::Sort);
2209  omp_set_num_threads(omp_get_max_threads());
2210  #pragma omp parallel
2211  {
2212  #pragma omp single nowait
2213  {
2214  #ifndef GLib_WIN32
2215  #pragma omp task untied shared(SrcCol1, EdgeCol1)
2216  #endif
2217  { TTable::QSortKeyVal(SrcCol1, EdgeCol1, 0, NumRows-1); }
2218  }
2219  #pragma omp single nowait
2220  {
2221  #ifndef GLib_WIN32
2222  #pragma omp task untied shared(EdgeCol2, DstCol2)
2223  #endif
2224  { TTable::QSortKeyVal(DstCol2, EdgeCol2, 0, NumRows-1); }
2225  }
2226  #ifndef GLib_WIN32
2227  #pragma omp taskwait
2228  #endif
2229  }
2230  Sw->Stop(TStopwatch::Sort);
2231 
2232  Sw->Start(TStopwatch::Group);
2233  TInt NumThreads = omp_get_max_threads();
2234  TInt PartSize = (NumRows/NumThreads);
2235 
2236  // Find the offset of all partitions, each of which contains a list of rows.
2237  // Nodes from same sources or destinations are ensured to be kept within same partition.
2238  TIntV SrcOffsets, DstOffsets;
2239  SrcOffsets.Add(0);
2240  for (TInt i = 1; i < NumThreads; i++) {
2241  TInt CurrOffset = i * PartSize;
2242  while (CurrOffset < (i+1) * PartSize &&
2243  SrcCol1[CurrOffset-1] == SrcCol1[CurrOffset]) {
2244  // ensure that rows from the same sources are grouped together
2245  CurrOffset++;
2246  }
2247  if (CurrOffset < (i+1) * PartSize) { SrcOffsets.Add(CurrOffset); }
2248  }
2249  SrcOffsets.Add(NumRows);
2250 
2251  DstOffsets.Add(0);
2252  for (TInt i = 1; i < NumThreads; i++) {
2253  TInt CurrOffset = i * PartSize;
2254  while (CurrOffset < (i+1) * PartSize &&
2255  DstCol2[CurrOffset-1] == DstCol2[CurrOffset]) {
2256  // ensure that rows to the same destinations are grouped together
2257  CurrOffset++;
2258  }
2259  if (CurrOffset < (i+1) * PartSize) { DstOffsets.Add(CurrOffset); }
2260  }
2261  DstOffsets.Add(NumRows);
2262 
2263  TInt SrcPartCnt = SrcOffsets.Len()-1; // number of partitions
2264  TInt DstPartCnt = DstOffsets.Len()-1; // number of partitions
2265 
2266  // count the number of source nodes and destination nodes in each partition
2267  TIntV SrcNodeCounts, DstNodeCounts;
2268  SrcNodeCounts.Reserve(SrcPartCnt, SrcPartCnt);
2269  DstNodeCounts.Reserve(DstPartCnt, DstPartCnt);
2270 
2271  #pragma omp parallel for schedule(dynamic)
2272  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
2273  if (t < SrcPartCnt) {
2274  TInt i = t;
2275  if (SrcOffsets[i] != SrcOffsets[i+1]) {
2276  SrcNodeCounts[i] = 1;
2277  TInt CurrNode = SrcCol1[SrcOffsets[i]];
2278  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
2279  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
2280  if (j < SrcOffsets[i+1]) {
2281  SrcNodeCounts[i]++;
2282  CurrNode = SrcCol1[j];
2283  }
2284  }
2285  }
2286  } else {
2287  TInt i = t - SrcPartCnt;
2288  if (DstOffsets[i] != DstOffsets[i+1]) {
2289  DstNodeCounts[i] = 1;
2290  TInt CurrNode = DstCol2[DstOffsets[i]];
2291  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
2292  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
2293  if (j < DstOffsets[i+1]) {
2294  DstNodeCounts[i]++;
2295  CurrNode = DstCol2[j];
2296  }
2297  }
2298  }
2299  }
2300  }
2301 
2302  TInt TotalSrcNodes = 0;
2303  TIntV SrcIdOffsets;
2304  for (int i = 0; i < SrcPartCnt; i++) {
2305  SrcIdOffsets.Add(TotalSrcNodes);
2306  TotalSrcNodes += SrcNodeCounts[i];
2307  }
2308 
2309  TInt TotalDstNodes = 0;
2310  TIntV DstIdOffsets;
2311  for (int i = 0; i < DstPartCnt; i++) {
2312  DstIdOffsets.Add(TotalDstNodes);
2313  TotalDstNodes += DstNodeCounts[i];
2314  }
2315 
2316  // printf("Total Src = %d, Total Dst = %d\n", TotalSrcNodes.Val, TotalDstNodes.Val);
2317 
2318  // find vector of (node_id, start_offset) where start_offset is the index of the first row with node_id
2319  TIntPrV SrcNodeIds, DstNodeIds;
2320  #pragma omp parallel sections
2321  {
2322  #pragma omp section
2323  { SrcNodeIds.Reserve(TotalSrcNodes, TotalSrcNodes); }
2324  #pragma omp section
2325  { DstNodeIds.Reserve(TotalDstNodes, TotalDstNodes); }
2326  }
2327 
2328  // Find the starting offset of each node (in both src and dst)
2329  #pragma omp parallel for schedule(dynamic)
2330  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
2331  if (t < SrcPartCnt) {
2332  TInt i = t;
2333  if (SrcOffsets[i] != SrcOffsets[i+1]) {
2334  TInt CurrNode = SrcCol1[SrcOffsets[i]];
2335  TInt ThreadOffset = SrcIdOffsets[i];
2336  SrcNodeIds[ThreadOffset] = TIntPr(CurrNode, SrcOffsets[i]);
2337  TInt CurrCount = 1;
2338  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
2339  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
2340  if (j < SrcOffsets[i+1]) {
2341  CurrNode = SrcCol1[j];
2342  SrcNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
2343  CurrCount++;
2344  }
2345  }
2346  }
2347  } else {
2348  TInt i = t - SrcPartCnt;
2349  if (DstOffsets[i] != DstOffsets[i+1]) {
2350  TInt CurrNode = DstCol2[DstOffsets[i]];
2351  TInt ThreadOffset = DstIdOffsets[i];
2352  DstNodeIds[ThreadOffset] = TIntPr(CurrNode, DstOffsets[i]);
2353  TInt CurrCount = 1;
2354  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
2355  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
2356  if (j < DstOffsets[i+1]) {
2357  CurrNode = DstCol2[j];
2358  DstNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
2359  CurrCount++;
2360  }
2361  }
2362  }
2363  }
2364  }
2365  Sw->Stop(TStopwatch::Group);
2366 
2368  // Find the combined neighborhood (both out-neighbors and in-neighbors) of each node
2369  TIntTrV Nodes;
2370  Nodes.Reserve(TotalSrcNodes+TotalDstNodes);
2371 
2372  TInt i = 0, j = 0;
2373  while (i < TotalSrcNodes && j < TotalDstNodes) {
2374  if (SrcNodeIds[i].Val1 == DstNodeIds[j].Val1) {
2375  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, j));
2376  i++;
2377  j++;
2378  } else if (SrcNodeIds[i].Val1 < DstNodeIds[j].Val1) {
2379  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1));
2380  i++;
2381  } else {
2382  Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j));
2383  j++;
2384  }
2385  }
2386  for (; i < TotalSrcNodes; i++) { Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1)); }
2387  for (; j < TotalDstNodes; j++) { Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j)); }
2389 
2391  TInt NumNodes = Nodes.Len();
2392  PGraphMP Graph = PGraphMP::TObj::New(NumNodes, NumRows);
2393 // NumThreads = omp_get_max_threads();
2394 // int Delta = (NumNodes+NumThreads-1)/NumThreads;
2395 
2396  TVec<TIntV> InVV(NumNodes);
2397  TVec<TIntV> OutVV(NumNodes);
2398 
2399 // omp_set_num_threads(NumThreads);
2400  #pragma omp parallel for schedule(static,100)
2401  for (int m = 0; m < NumNodes; m++) {
2402  //double startTr = omp_get_wtime();
2403  //TIntV OutV, InV;
2404  TInt n, i, j;
2405  Nodes[m].GetVal(n, i, j);
2406  if (i >= 0) {
2407  TInt Offset = SrcNodeIds[i].GetVal2();
2408  TInt Sz = EdgeCol1.Len()-Offset;
2409  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
2410  OutVV[m].Reserve(Sz);
2411  OutVV[m].CopyUniqueFrom(EdgeCol1, Offset, Sz);
2412  }
2413  if (j >= 0) {
2414  TInt Offset = DstNodeIds[j].GetVal2();
2415  TInt Sz = EdgeCol2.Len()-Offset;
2416  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
2417  InVV[m].Reserve(Sz);
2418  InVV[m].CopyUniqueFrom(EdgeCol2, Offset, Sz);
2419  }
2420  Graph->AddNodeWithEdges(n, InVV[m], OutVV[m]);
2421  }
2422  Graph->SetNodes(NumNodes);
2424 
2426  omp_set_num_threads(omp_get_max_threads());
2427  if (NodeType == atInt) {
2428  #pragma omp parallel for schedule(static)
2429  for (int i = 0; i < Partitions.Len(); i++) {
2430  TRowIterator RowI(Partitions[i].GetVal1(), Table());
2431  TRowIterator EndI(Partitions[i].GetVal2(), Table());
2432  while (RowI < EndI) {
2433  TInt RowId = RowI.GetRowIdx(); // EdgeId
2434  TInt SrcId = RowI.GetIntAttr(SrcColIdx);
2435  TInt DstId = RowI.GetIntAttr(DstColIdx);
2436  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
2437  RowI++;
2438  }
2439  }
2440  }
2441  else if (NodeType == atStr) {
2442  #pragma omp parallel for schedule(static)
2443  for (int i = 0; i < Partitions.Len(); i++) {
2444  TRowIterator RowI(Partitions[i].GetVal1(), Table());
2445  TRowIterator EndI(Partitions[i].GetVal2(), Table());
2446  while (RowI < EndI) {
2447  TInt RowId = RowI.GetRowIdx(); // EdgeId
2448  TInt SrcId = RowI.GetStrMapById(SrcColIdx);
2449  TInt DstId = RowI.GetStrMapById(DstColIdx);
2450  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
2451  RowI++;
2452  }
2453  }
2454 
2455  }
2456 
2457  Graph->SetEdges(NumRows);
2458  Graph->SetMxEId(NumRows);
2460 
2461  // make single pass over all rows in the table to add attributes
2462  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
2463  if ((Table->Next)[CurrRowIdx] == Table->Invalid) {
2464  continue;
2465  }
2466  for (TInt ea_i = 0; ea_i < EdgeAttrV.Len(); ea_i++) {
2467  TStr ColName = EdgeAttrV[ea_i];
2468  TAttrType T = Table->GetColType(ColName);
2469  TInt Index = Table->GetColIdx(ColName);
2470  switch (T) {
2471  case atInt:
2472  Graph->AddIntAttrDatE(CurrRowIdx, Table->IntCols[Index][CurrRowIdx], ColName);
2473  break;
2474  case atFlt:
2475  Graph->AddFltAttrDatE(CurrRowIdx, Table->FltCols[Index][CurrRowIdx], ColName);
2476  break;
2477  case atStr:
2478  Graph->AddStrAttrDatE(CurrRowIdx, Table->GetStrVal(Index, CurrRowIdx), ColName);
2479  break;
2480  }
2481  }
2482  }
2483 
2484  // Add node attribtes
2485  if (NodeAttrV.Len() > 0) {
2486  for (int CurrRowIdx = 0; CurrRowIdx < (NodeTable->Next).Len(); CurrRowIdx++) {
2487  if ((NodeTable->Next)[CurrRowIdx] == NodeTable->Invalid) {
2488  continue;
2489  }
2490  TInt NId;
2491  if (NodeTypeN == atInt) {
2492  NId = (NodeTable->IntCols)[NodeColIdx][CurrRowIdx];
2493  }
2494  else if (NodeTypeN == atStr){
2495  NId = (NodeTable->StrColMaps)[NodeColIdx][CurrRowIdx];
2496  }
2497  for (TInt i = 0; i < NodeAttrV.Len(); i++) {
2498  TStr ColName = NodeAttrV[i];
2499  TAttrType T = NodeTable->GetColType(ColName);
2500  TInt Index = NodeTable->GetColIdx(ColName);
2501  switch (T) {
2502  case atInt:
2503  Graph->AddIntAttrDatN(NId, NodeTable->IntCols[Index][CurrRowIdx], ColName);
2504  break;
2505  case atFlt:
2506  Graph->AddFltAttrDatN(NId, NodeTable->FltCols[Index][CurrRowIdx], ColName);
2507  break;
2508  case atStr:
2509  Graph->AddStrAttrDatN(NId, NodeTable->GetStrVal(Index, CurrRowIdx), ColName);
2510  break;
2511  }
2512  }
2513  }
2514  }
2515  // double endAdd = omp_get_wtime();
2516  // printf("Add time = %f\n", endAdd-endAlloc);
2517 
2518  return Graph;
2519 }
2520 #endif // GCC_ATOMIC
2521 
2522 }; // TSnap namespace
2523 
2524 #endif // CONV_H
2525 
int GetPrimHashCd() const
Definition: dt.h:1168
TPair< TInt, TInt > TIntPr
Definition: ds.h:83
enum TAttrType_ TAttrType
Types for tables, sparse and dense attributes.
int LoadModeNetToNet(PMMNet Graph, const TStr &Name, PTable Table, const TStr &NCol, TStrV &NodeAttrV)
Loads a mode, with name Name, into the PMMNet from the TTable. NCol specifies the node id column and ...
Definition: conv.cpp:6
TInt GetIntAttr(TInt ColIdx) const
Returns value of integer attribute specified by integer column index for current row.
Definition: table.cpp:155
int Val
Definition: dt.h:1136
PGraphMP ToGraphMP(PTable Table, const TStr &SrcCol, const TStr &DstCol)
Performs table to graph conversion in parallel using the sort-first algorithm. This is the recommende...
Definition: conv.h:192
unsigned int uint
Definition: bd.h:11
TIter BegI() const
Definition: hash.h:213
TSizeTy Len() const
Returns the number of elements in the vector.
Definition: ds.h:575
static PNGraphMP New()
Static constructor that returns a pointer to the graph. Call: PNGraphMP Graph = TNGraphMP::New().
Definition: graphmp.h:141
void Start(const TExperiment Exp)
Start a new experiment.
Definition: util.cpp:733
TAttrAggr
Possible policies for aggregating node attributes.
Definition: table.h:257
const TDat & GetDat(const TKey &Key) const
Definition: hash.h:262
Node iterator. Only forward iteration (operator++) is supported.
Definition: network.h:1792
TIter EndI() const
Definition: hash.h:218
void Clr()
Definition: bd.h:502
void CopyUniqueFrom(TVec< TVal, TSizeTy > &Vec, TInt Offset, TInt Sz)
Copy Sz values from Vec starting at Offset.
Definition: ds.h:1082
Definition: gbase.h:23
Definition: dt.h:1383
void Stop(const TExperiment Exp)
Stop the current experiment.
Definition: util.cpp:737
Iterator class for TTable rows.
Definition: table.h:330
const TVal & GetVal(const TSizeTy &ValN) const
Returns a reference to the element at position ValN in the vector.
Definition: ds.h:649
int LoadCrossNetToNet(PMMNet Graph, const TStr &Mode1, const TStr &Mode2, const TStr &CrossName, PTable Table, const TStr &SrcCol, const TStr &DstCol, TStrV &EdgeAttrV)
Loads a crossnet from Mode1 to Mode2, with name CrossName, from the provided TTable. EdgeAttrV specifies edge attributes.
Definition: conv.cpp:60
void Clr(const bool &DoDel=true, const TSizeTy &NoDelLim=-1)
Clears the contents of the vector.
Definition: ds.h:1022
PGraph ToGraph(PTable Table, const TStr &SrcCol, const TStr &DstCol, TAttrAggr AggrPolicy)
Sequentially converts the table into a graph with links from nodes in SrcCol to those in DstCol...
Definition: conv.h:8
int LoadCrossNet(TCrossNet &Graph, PTable Table, const TStr &SrcCol, const TStr &DstCol, TStrV &EdgeAttrV)
Loads the edges from the TTable and EdgeAttrV specifies columns containing edge attributes.
Definition: conv.cpp:69
PGraphMP ToNetworkMP(PTable Table, const TStr &SrcCol, const TStr &DstCol, TStrV &SrcAttrV, TStrV &DstAttrV, TStrV &EdgeAttrV, TAttrAggr AggrPolicy)
Does Table to Network conversion in parallel using the sort-first algorithm. This is the recommended ...
Definition: conv.h:696
#define Assert(Cond)
Definition: bd.h:251
PGraph ToNetwork(PTable Table, const TStr &SrcCol, const TStr &DstCol, TStrV &SrcAttrV, TStrV &DstAttrV, TStrV &EdgeAttrV, TAttrAggr AggrPolicy)
Converts the Table into a graph with edges from SrcCol to DstCol, and attribute vector defined by the...
Definition: conv.h:64
int LoadMode(TModeNet &Graph, PTable Table, const TStr &NCol, TStrV &NodeAttrV)
Loads the nodes specified in column NCol from the TTable with the attributes specified in NodeAttrV...
Definition: conv.cpp:14
TInt GetRowIdx() const
Gets the id of the row pointed by this iterator.
Definition: table.cpp:151
TInt GetStrMapById(TInt ColIdx) const
Returns integer mapping of a string attribute value specified by string column index for current row...
Definition: table.cpp:186
static void QSortKeyVal(TIntV &Key, TIntV &Val, TInt Start, TInt End)
Definition: table.cpp:5378
The nodes of one particular mode in a TMMNet, and their neighbor vectors as TIntV attributes...
Definition: mmnet.h:23
Definition: dt.h:1134
PGraphMP ToGraphMP3(PTable Table, const TStr &SrcCol, const TStr &DstCol)
Performs table to graph conversion in parallel. Uses the hash-first method, which is less optimal...
Definition: conv.h:532
Definition: dt.h:412
Definition: hash.h:97
Definition: gbase.h:23
Definition: bd.h:196
TTriple< TInt, TInt, TInt > TIntTr
Definition: ds.h:171
void Gen(const TSizeTy &_Vals)
Constructs a vector (an array) of _Vals elements.
Definition: ds.h:523
void Reserve(const TSizeTy &_MxVals)
Reserves enough memory for the vector to store _MxVals elements.
Definition: ds.h:543
static TStopwatch * GetInstance()
Definition: util.h:82
Definition: gbase.h:23
bool IsKey(const TKey &Key) const
Definition: hash.h:258
TSizeTy Add()
Adds a new element at the end of the vector, after its current last element.
Definition: ds.h:602
PGraphMP ToNetworkMP2(PTable Table, const TStr &SrcCol, const TStr &DstCol, TStrV &SrcAttrV, TStrV &DstAttrV, TStrV &EdgeAttrV, TAttrAggr AggrPolicy)
Implements table to network conversion in parallel. Not the recommended algorithm, using ToNetworkMP instead.
Definition: conv.h:1118
Implements a single CrossNet consisting of edges between two TModeNets (could be the same TModeNet) ...
Definition: mmnet.h:133
Routines to benchmark table operations.
Definition: util.h:71