17 while (!(Curr == NULL && Prev ==
Root)) {
19 if (Prev == NULL || Prev == Curr->
Parent) {
21 if (Curr->
Left != NULL) {
24 }
else if (Curr->
Right != NULL) {
32 }
else if (Prev == Curr->
Left) {
142 if (
CurrRowIdx == TTable::Last) {
return false; }
143 if (RowI.
CurrRowIdx == TTable::Last) {
return true; }
203 Result =
TBool(
false);
216 CurrRowIdx(RowIdx), Table(TablePtr), Start(RowIdx == TablePtr->FirstValidRow) {}
230 if (
CurrRowIdx == TTable::Last) {
return false; }
231 if (RowI.
CurrRowIdx == TTable::Last) {
return true; }
295 Result =
TBool(
false);
303 FirstValidRow(0), LastValidRow(-1) {}
306 NumValidRows(0), FirstValidRow(0), LastValidRow(-1) {}
309 NumRows(0), NumValidRows(0), FirstValidRow(0), LastValidRow(-1), IsNextDirty(0) {
313 for (
TInt i = 0; i < TableSchema.
Len(); i++) {
314 TStr ColName = TableSchema[i].Val1;
379 NumValidRows(SIn), FirstValidRow(SIn), LastValidRow(SIn), Next(SIn), IntCols(SIn),
380 FltCols(SIn), StrColMaps(SIn) {
387 NumValidRows(H.Len()), FirstValidRow(0), LastValidRow(H.Len()-1) {
414 NumRows(H.Len()), NumValidRows(H.Len()), FirstValidRow(0), LastValidRow(H.Len()-1) {
439 Sch(Table.Sch), SrcCol(Table.SrcCol), DstCol(Table.DstCol), EdgeAttrV(Table.EdgeAttrV),
440 SrcNodeAttrV(Table.SrcNodeAttrV), DstNodeAttrV(Table.DstNodeAttrV),
441 CommonNodeAttrs(Table.CommonNodeAttrs) {
458 TSsParser Ss(InFNm,
'\t',
false,
false,
false);
459 TInt rowsToPeek = 1000;
461 TInt lastComment = 0;
473 for (
TInt i = 0; i < numCols; i++) {
476 else if (Ss.
IsFlt(i)) {
484 if (currRow > rowsToPeek || Ss.
Eof())
break;
488 TSsParser SsNames(InFNm, Separator,
false,
false,
false);
489 for (
int i = 0; i < lastComment; i++) { SsNames.
Next();}
491 TStr first(SsNames[0]);
494 if (first != comment) {
495 for (
int i = 1; i < first.
Len(); i++){
496 if (first[i] !=
' ') { begin = i;
break;}
500 for (
int i = 1; i < SsNames.
GetFlds(); i++) {attrV.
Add(SsNames[i]);}
501 for (
TInt i = 0; i < numCols; i++) {
508 const char& Separator,
TBool HasTitleLine) {
510 TInt RowLen = T->Sch.Len();
512 for (
TInt i = 0; i < RowLen; i++) {
513 ColTypes[i] = T->GetSchemaColType(i);
528 TInt L = strlen(Ss[i]);
529 if (Ss[i][L-1] <
' ') { Ss[i][L-1] = 0; }
540 int NumThreads = omp_get_max_threads();
542 uint64 Delta = Rem / NumThreads;
543 if (Delta < 1) Delta = 1;
550 for (
int i = 1; i < NumThreads; i++) {
551 StartIntV[i] = StartIntV[i-1] + Delta;
556 omp_set_num_threads(NumThreads);
557 #pragma omp parallel for schedule(dynamic) reduction(+:Cnt)
558 for (
int i = 0; i < NumThreads; i++) {
560 Cnt += LineCountV[i];
565 for (
int i = 1; i < NumThreads; i++) {
566 PrefixSumV[i] = PrefixSumV[i-1] + LineCountV[i-1];
573 for (
TInt i = 0; i < RowLen; i++) {
574 switch (ColTypes[i]) {
576 T->IntCols[IntColIdx].Gen(Cnt);
580 T->FltCols[FltColIdx].Gen(Cnt);
589 omp_set_num_threads(NumThreads);
590 #pragma omp parallel for schedule(dynamic) reduction(+:Cnt)
591 for (
int i = 0; i < NumThreads; i++) {
599 if (FieldsV.
Len() != S.
Len()) {
604 TInt RowIdx = PrefixSumV[i] + k;
606 for (
TInt j = 0; j < RowLen; j++) {
607 switch (ColTypes[j]) {
609 if (RelevantCols.
Len() == 0) {
610 T->IntCols[IntColIdx][RowIdx] = \
613 T->IntCols[IntColIdx][RowIdx] = \
619 if (RelevantCols.
Len() == 0) {
620 T->FltCols[FltColIdx][RowIdx] = \
623 T->FltCols[FltColIdx][RowIdx] = \
639 T->NumValidRows = T->NumRows;
644 omp_set_num_threads(NumThreads);
645 #pragma omp parallel for schedule(dynamic, 10000)
646 for (
int64 i = 0; i < Cnt-1; i++) {
650 T->Next[Cnt-1] =
Last;
651 T->LastValidRow = T->NumRows - 1;
653 T->IdColName =
"_id";
654 TInt IdCol = T->IntCols.Add();
655 T->IntCols[IdCol].Gen(Cnt);
658 omp_set_num_threads(NumThreads);
659 #pragma omp parallel for schedule(dynamic, 10000)
660 for (
int64 i = 0; i < Cnt; i++) {
661 T->IntCols[IdCol][i] = i;
664 T->AddSchemaCol(T->IdColName,
atInt);
665 T->AddColType(T->IdColName,
atInt, T->IntCols.Len()-1);
671 const char& Separator,
TBool HasTitleLine) {
673 int RowLen = T->Sch.Len();
675 for (
int i = 0; i < RowLen; i++) {
676 ColTypes[i] = T->GetSchemaColType(i);
688 for (
int i = 0; i < Ss.
GetFlds(); i++) {
690 int L = strlen(Ss[i]);
691 if (Ss[i][L-1] <
' ') { Ss[i][L-1] = 0; }
707 for (
int i = 0; i < RowLen; i++) {
708 switch (ColTypes[i]) {
710 if (RelevantCols.
Len() == 0) {
711 T->IntCols[IntColIdx].Add(Ss.
GetInt(i));
713 T->IntCols[IntColIdx].Add(Ss.
GetInt(RelevantCols[i]));
718 if (RelevantCols.
Len() == 0) {
719 T->FltCols[FltColIdx].Add(Ss.
GetFlt(i));
721 T->FltCols[FltColIdx].Add(Ss.
GetFlt(RelevantCols[i]));
727 if (RelevantCols.
Len() == 0) {
730 ColIdx = RelevantCols[i];
733 T->AddStrVal(StrColIdx, Sval);
742 T->NumRows =
static_cast<int>(Cnt);
743 T->NumValidRows = T->NumRows;
746 T->Next.Gen(static_cast<int>(Cnt));
747 for (
uint64 i = 0; i < Cnt-1; i++) {
748 T->Next[
static_cast<int>(i)] = static_cast<int>(i+1);
751 T->Next[
static_cast<int>(Cnt-1)] =
Last;
752 T->LastValidRow = T->NumRows - 1;
758 const TIntV& RelevantCols,
const char& Separator,
TBool HasTitleLine) {
760 bool NoStringCols =
true;
764 if (RelevantCols.
Len() == 0) {
767 for (
int i = 0; i < RelevantCols.
Len(); i++) {
768 SR.
Add(S[RelevantCols[i]]);
774 for (
int i = 0; i < SR.
Len(); i++) {
775 if (T->GetSchemaColType(i) ==
atStr) {
776 NoStringCols =
false;
781 if (
GetMP() && NoStringCols) {
785 LoadSSPar(T, S, InFNm, RelevantCols, Separator, HasTitleLine);
787 LoadSSSeq(T, S, InFNm, RelevantCols, Separator, HasTitleLine);
790 LoadSSSeq(T, S, InFNm, RelevantCols, Separator, HasTitleLine);
796 const char& Separator,
TBool HasTitleLine) {
797 return LoadSS(S, InFNm, Context,
TIntV(), Separator, HasTitleLine);
802 printf(
"Table is empty");
805 FILE* F = fopen(OutFNm.
CStr(),
"w");
808 printf(
"failed to open file %s\n", OutFNm.
CStr());
821 for (
TInt i = 0; i < L-1; i++) {
822 fprintf(F,
"%s\t", DSch[i].Val1.CStr());
824 fprintf(F,
"%s\n", DSch[L-1].Val1.CStr());
827 for (
TInt i = 0; i < L; i++) {
828 char C = (i == L-1) ?
'\n' :
'\t';
883 ColTypeIntMap.
Save(SOut);
895 for (
TInt i = 0; i < L-1; i++) {
896 fprintf(OutF,
"%s\t", DSch[i].Val1.CStr());
898 fprintf(OutF,
"%s\n", DSch[L-1].Val1.CStr());
901 for (
TInt i = 0; i < L; i++) {
902 char C = (i == L-1) ?
'\n' :
'\t';
926 for (
TInt i = 0; i < L; i++) {
936 TInt RowIdx = RowI.GetRowIdx();
938 printf(
"ChangeContext in %d %d %d .%s.\n",
946 for (
TInt i = 0; i < L; i++) {
956 TInt RowIdx = RowI.GetRowIdx();
993 for (
TInt i = 0; i < Attrs.
Len(); i++) {
998 for (
TInt i = 0; i < Attrs.
Len(); i++) {
1115 if (
Sch[c].Val1 == NColName) {
1140 Assert(RowIdx != TTable::Invalid);
1141 if (RowIdx == TTable::Last) {
return; }
1154 for (
TInt i = 0; i < KeepV.
Len(); i++) {
1161 if (KeepSize < KeepV.
Len()) {
1179 if (
NumValidRows % NumPartitions != 0) PartitionSize++;
1180 if (PartitionSize < 10) {
1184 Partitions.
Reserve(NumPartitions+1);
1187 TInt currStart = currRow;
1189 TInt currCount = PartitionSize;
1190 while (currRow != TTable::Last) {
1191 if (currCount == 0) {
1192 Partitions.
Add(
TIntPr(currStart, currRow));
1193 currStart = currRow;
1194 currCount = PartitionSize;
1196 currRow =
Next[currRow];
1199 Partitions.
Add(
TIntPr(currStart, currRow));
1202 currRow += PartitionSize;
1203 while (currRow != TTable::Last && currRow <
Next.
Len()) {
1204 if (
Next[currRow] == TTable::Invalid) { currRow++;
continue; }
1205 Partitions.
Add(
TIntPr(currStart, currRow));
1206 currStart = currRow;
1207 currRow += PartitionSize;
1209 Partitions.
Add(
TIntPr(currStart, TTable::Last));
1227 gettimeofday(&timer0, NULL);
1232 if(!UsePhysicalIds && IdColIdx < 0){
1233 TExcept::Throw(
"Grouping: Either use physical row ids, or have an id column");
1239 TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
1247 #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD) //num_threads(1)
1248 for (
int i = 0; i < Partitions.
Len(); i++){
1251 while (RowI < EndI) {
1254 UpdateGrouping<TInt>(Grouping, RowI.
GetIntAttr(GroupByColIdx), idx);
1258 gettimeofday(&timer0, NULL);
1264 #endif // GCC_ATOMIC
1267 TIntV RemainingRows;
1274 RemainingRows.
Add(it->Dat[0]);
1282 RemainingRows.
Add(it->Dat[0]);
1290 RemainingRows.
Add(it->Dat[0]);
1299 if(Cols.
Len() == 1){
1306 GroupAux(NCols, Grouping, Ordered,
"",
true, UniqueVec,
true);
1316 for (
TInt i = 0; i < GroupAndRowIds.Len(); i++) {
1317 IntCols[L-1][GroupAndRowIds[i].Val2] = GroupAndRowIds[i].Val1;
1325 if(!UsePhysicalIds && IdColIdx < 0){
1326 TExcept::Throw(
"Grouping: Either use physical row ids, or have an id column");
1328 TIntV IntGroupByCols;
1329 TIntV FltGroupByCols;
1330 TIntV StrGroupByCols;
1332 for (
TInt c = 0; c < GroupBy.
Len(); c++) {
1339 switch (ColType.
Val1) {
1341 IntGroupByCols.
Add(ColType.
Val2);
1344 FltGroupByCols.
Add(ColType.
Val2);
1347 StrGroupByCols.
Add(ColType.
Val2);
1352 TInt IKLen = IntGroupByCols.
Len();
1353 TInt FKLen = FltGroupByCols.
Len();
1354 TInt SKLen = StrGroupByCols.
Len();
1362 TIntV IKey(IKLen + SKLen, 0);
1363 TFltV FKey(FKLen, 0);
1364 TIntV SKey(SKLen, 0);
1367 for (
TInt c = 0; c < IKLen; c++) {
1368 IKey.
Add(it.GetIntAttr(IntGroupByCols[c]));
1370 for (
TInt c = 0; c < FKLen; c++) {
1371 FKey.
Add(it.GetFltAttr(FltGroupByCols[c]));
1373 for (
TInt c = 0; c < SKLen; c++) {
1374 SKey.
Add(it.GetStrMapById(StrGroupByCols[c]));
1377 if (IKLen > 0) { IKey.
ISort(0, IKey.
Len()-1,
true); }
1378 if (FKLen > 0) { FKey.
ISort(0, FKey.
Len()-1,
true); }
1379 if (SKLen > 0) { SKey.
ISort(0, SKey.
Len()-1,
true); }
1381 for (
TInt c = 0; c < SKLen; c++) {
1388 TInt RowIdx = it.GetRowIdx();
1389 TInt idx = UsePhysicalIds ? it.GetRowIdx() :
IntCols[IdColIdx][it.GetRowIdx()];
1390 if (!Grouping.IsKey(GroupKey)) {
1393 NewGroup.
Val1 = GroupNum;
1395 Grouping.AddDat(GroupKey, NewGroup);
1396 if (GroupColName !=
"") {
1408 if (GroupColName !=
"") {
1432 if (GroupColName !=
"") {
1574 GroupAux(NGroupBy, Grouping, Ordered, NGroupColName,
false, UniqueVec, UsePhysicalIds);
1588 for (
TInt c = 0; c < GroupByAttrs.
Len(); c++) {
1606 TInt NumOfGroups = 0;
1607 TInt GroupingCase = 0;
1610 GroupStmt Stmt(NGroupByAttrs, Ordered, UsePhysicalIds);
1614 if(NGroupByAttrs.
Len() == 1){
1619 GroupByIntColMP(NGroupByAttrs[0], GroupByIntMapping_MP, UsePhysicalIds);
1622 GroupByIntMPKeys[x] = it.GetKey();
1638 #endif // GCC_ATOMIC
1640 NumOfGroups = GroupByIntMapping.
Len();
1645 NumOfGroups = GroupByFltMapping.
Len();
1650 NumOfGroups = GroupByStrMapping.
Len();
1658 GroupAux(NGroupByAttrs, Mapping_aux, Ordered,
"",
false, UniqueVector, UsePhysicalIds);
1660 Mapping.
AddDat(it.GetKey(), it.GetDat().Val2);
1662 NumOfGroups = Mapping.
Len();
1688 #pragma omp parallel for schedule(dynamic)
1690 for (
int g = 0; g < NumOfGroups; g++) {
1691 TIntV* GroupRows = NULL;
1692 switch(GroupingCase){
1697 GroupRows = & GroupByIntMapping.
GetDat(GroupByIntMapping.
GetKey(g));
1700 GroupRows = & GroupByIntMapping.
GetDat(GroupByIntMapping.
GetKey(g));
1703 GroupRows = & GroupByStrMapping.
GetDat(GroupByStrMapping.
GetKey(g));
1707 GroupRows = & GroupByIntMapping_MP.
GetDat(GroupByIntMPKeys[g]);
1723 TIntV& ValidRows = *GroupRows;
1725 if (sz <= 0)
continue;
1728 for (
TInt i = 0; i < sz; i++) {
IntCols[ColIdx][ValidRows[i]] = sz; }
1733 for (
TInt i = 0; i < sz; i++) { V.
Add(
IntCols[AggrColIdx][ValidRows[i]]); }
1734 TInt Res = AggregateVector<TInt>(V, AggOp);
1735 if (AggOp ==
aaMean) { Res = Res / sz; }
1736 for (
TInt i = 0; i < sz; i++) {
IntCols[ColIdx][ValidRows[i]] = Res; }
1739 for (
TInt i = 0; i < sz; i++) { V.
Add(
FltCols[AggrColIdx][ValidRows[i]]); }
1740 TFlt Res = AggregateVector<TFlt>(V, AggOp);
1741 if (AggOp ==
aaMean) { Res /= sz; }
1742 for (
TInt i = 0; i < sz; i++) {
FltCols[ColIdx][ValidRows[i]] = Res; }
1752 for (
TInt i = 0; i < AggrAttrs.
Len(); i++) {
1754 if (Info[i].Val1 != Info[0].Val1) {
1755 TExcept::Throw(
"AggregateCols: Aggregation attributes must have the same type");
1759 if (Info[0].Val1 ==
atInt) {
1764 TInt RowIdx = RI.GetRowIdx();
1766 for (
TInt i = 0; i < AggrAttrs.
Len(); i++) {
1769 IntCols[ResIdx][RowIdx] = AggregateVector<TInt>(V, AggOp);
1771 }
else if (Info[0].Val1 ==
atFlt) {
1776 TInt RowIdx = RI.GetRowIdx();
1778 for (
TInt i = 0; i < AggrAttrs.
Len(); i++) {
1781 FltCols[ResIdx][RowIdx] = AggregateVector<TFlt>(V, AggOp);
1784 TExcept::Throw(
"AggregateCols: Only Int and Flt aggregation supported right now");
1793 for(
int i = 0; i < ik.
Len(); i++){ printf(
"%d ",ik[i].Val);}
1794 for(
int i = 0; i < fk.
Len(); i++){ printf(
"%f ",fk[i].Val);}
1797 for(
int i = 0; i < v.
Len(); i++){ printf(
"%d ",v[i].Val);}
1804 GroupByAttrs.
Add(CountColName);
1821 GroupAux(NGroupBy, Grouping, Ordered,
"",
false, UniqueVec);
1831 ColInfo.
Add(GroupTable->GetColTypeMap(
Sch[i].Val1));
1833 ColInfo[i].Val2 = -1;
1841 for (
TInt i = 0; i < Rows.
Len(); i++) {
1850 if (ColIdx == -1) {
continue; }
1853 switch (Info.
Val1) {
1855 GroupTable->IntCols[ColIdx].Add(
IntCols[V[c]][RowIdx]);
1858 GroupTable->FltCols[ColIdx].Add(
FltCols[V[c]][RowIdx]);
1861 GroupTable->StrColMaps[ColIdx].Add(
StrColMaps[V[c]][RowIdx]);
1866 if (GroupTable->LastValidRow >= 0) {
1867 GroupTable->Next[GroupTable->LastValidRow] = GroupTable->NumRows;
1869 GroupTable->Next.Add(GroupTable->Last);
1870 GroupTable->LastValidRow = GroupTable->NumRows;
1872 GroupTable->NumRows++;
1873 GroupTable->NumValidRows++;
1875 GroupTable->InitIds();
1876 Result.
Add(GroupTable);
1894 IntCols[IdColIdx][RI.GetRowIdx()] = IdCnt;
1908 IntCols[IdCol][RI.GetRowIdx()] = IdCnt;
1924 TStr CName = JointTable->RenumberColName(ColName);
1926 JointTable->AddColType(CName, TypeMap);
1928 JointTable->AddSchemaCol(CName, ColType);
1933 TStr CName = JointTable->RenumberColName(ColName);
1937 switch (NewDat.
Val1) {
1948 JointTable->AddColType(CName, NewDat);
1949 JointTable->AddSchemaCol(CName, ColType);
1953 JointTable->AddSchemaCol(IdColName,
atInt);
1998 if(Cols1.
Len()!=Cols2.
Len()){
2002 for (
TInt i = 0; i < Cols1.
Len(); i++) {
2016 TExcept::Throw(
"Column type not supported. Only Flt and Int column types are supported.");
2033 for(
TInt i = 0; i < Cols1.
Len(); i++) {
2034 float attrVal1, attrVal2;
2035 attrVal1 =
GetColType(Cols1[i])==
atInt ? (float)RowI.GetIntAttr(Cols1[i]) : (float)RowI.GetFltAttr(Cols1[i]);
2036 attrVal2 = Table.
GetColType(Cols2[i])==
atInt ? (float)RowI2.GetIntAttr(Cols2[i]) : (float)RowI2.GetFltAttr(Cols2[i]);
2037 distance += pow(attrVal1 - attrVal2, 2);
2040 distance = sqrt(distance);
2042 if(distance<=Threshold){
2043 JointTable->AddJointRow(*
this, Table, RowI.GetRowIdx(), RowI2.GetRowIdx());
2044 DistanceV.
Add(distance);
2053 TExcept::Throw(
"Haversine disance expects exactly two attributes - latitude and longitude - in that order.");
2058 float Latitude1 =
GetColType(Cols1[0])==
atInt ? (float)RowI.GetIntAttr(Cols1[0]) : (float)RowI.GetFltAttr(Cols1[0]);
2059 float Latitude2 = Table.
GetColType(Cols2[0])==
atInt ? (float)RowI2.GetIntAttr(Cols2[0]) : (float)RowI2.GetFltAttr(Cols2[0]);
2061 float Longitude1 =
GetColType(Cols1[1])==
atInt ? (float)RowI.GetIntAttr(Cols1[1]) : (float)RowI.GetFltAttr(Cols1[1]);
2062 float Longitude2 = Table.
GetColType(Cols2[1])==
atInt ? (float)RowI2.GetIntAttr(Cols2[1]) : (float)RowI2.GetFltAttr(Cols2[1]);
2064 Latitude1 *=
static_cast<float>(M_PI/180.0);
2065 Latitude2 *=
static_cast<float>(M_PI/180.0);
2066 Longitude1 *=
static_cast<float>(M_PI/180.0);
2067 Longitude2 *=
static_cast<float>(M_PI/180.0);
2069 float dlon = Longitude2 - Longitude1;
2070 float dlat = Latitude2 - Latitude1;
2071 float a = pow(sin(dlat/2), 2) + cos(Latitude1)*cos(Latitude2)*pow(sin(dlon/2), 2);
2072 float c = 2*atan2(sqrt(a), sqrt(1-a));
2073 distance = (
static_cast<float>(Radius.
Val))*c;
2075 if(distance<=Threshold){
2076 JointTable->AddJointRow(*
this, Table, RowI.GetRowIdx(), RowI2.GetRowIdx());
2077 DistanceV.
Add(distance);
2089 JointTable->StoreFltCol(DistanceColName, DistanceV);
2090 JointTable->InitIds();
2105 for(
TInt i=0;i<2;i++){
2109 JointTable->AddColType(CName, Group);
2110 JointTable->AddSchemaCol(CName,
atInt);
2114 JointTable->AddColType(DistanceColName, Group);
2115 JointTable->AddSchemaCol(DistanceColName,
atFlt);
2124 TInt GroupId =
IntCols[GroupColIdx][RowI.GetRowIdx()];
2128 if(!TIntHH.
IsKey(GroupId)){
2130 TIntHH.
AddDat(GroupId, TIntH);
2135 TIntH.
AddDat(SimAttrVal, 0);
2146 TInt GroupId1 = it1.GetKey();
2149 int intersectionCount = 0;
2150 TInt GroupId2 = it2.GetKey();
2155 TInt Val = it.GetKey();
2156 if(Vals2H.
IsKey(Val)){
2157 intersectionCount+=1;
2161 int unionCount = Vals1H.
Len() + Vals2H.
Len() - intersectionCount;
2162 float distance = 1.0f - (float)intersectionCount/unionCount;
2165 if(distance<=Threshold){
2166 JointTable->IntCols[0].Add(GroupId1);
2167 JointTable->IntCols[1].Add(GroupId2);
2168 JointTable->FltCols[0].Add(distance);
2169 JointTable->IncrementNext();
2174 JointTable->InitIds();
2181 const TStr& DistanceColName,
const TSimType& SimType,
const TFlt& Threshold) {
2186 for(
TInt i=0; i<GroupBy.
Len(); i++)
2188 ProjectionV.
Add(GroupBy[i]);
2191 ProjectionV.
Add(SimCol);
2194 TStr CName =
"Group";
2197 GroupAux(NGroupBy, Grouping,
false, CName,
false, UniqueVec);
2211 if(!GroupIdH.
IsKey(GroupNum))
2213 TInt RandomRowId = RowIds[0];
2214 GroupIdH.
AddDat(GroupNum, RandomRowId);
2218 for(
TRowIterator RowI = GroupJointTable->BegRI(); RowI < GroupJointTable->EndRI(); RowI++)
2221 TInt GroupId1 = GroupJointTable->IntCols[0][RowI.GetRowIdx()];
2222 TInt GroupId2 = GroupJointTable->IntCols[1][RowI.GetRowIdx()];
2227 JointTable->AddJointRow(*
this, *
this, RowId1, RowId2);
2232 JointTable->StoreFltCol(DistanceColName, GroupJointTable->FltCols[0]);
2234 ProjectionV.
Add(DistanceColName);
2238 for(
TInt i=0; i<GroupBy.
Len(); i++){
2239 for(
TInt j=0; j<JointTable->Sch.Len(); j++)
2241 TStr ColName = JointTable->Sch[j].Val1;
2242 if(ColName.
IsStrIn(GroupBy[i]))
2244 ProjectionV.
Add(ColName);
2249 JointTable->ProjectInPlace(ProjectionV);
2250 JointTable->InitIds();
2276 printf(
"no such column %s\n", Col1.
CStr());
2280 printf(
"no such column %s\n", Col2.
CStr());
2284 printf(
"Trying to Join on columns of different type\n");
2293 const TTable& TS = ThisIsSmaller ? *
this : Table;
2294 const TTable& TB = ThisIsSmaller ? Table : *
this;
2295 TStr ColS = ThisIsSmaller ? Col1 : Col2;
2296 TStr ColB = ThisIsSmaller ? Col2 : Col1;
2313 TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
2318 #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD)
2319 for (
int i = 0; i < Partitions.
Len(); i++){
2321 JointRowIDSet[i].
Reserve(PartitionSize);
2324 while (RowI < EndI) {
2328 for(
TInt j = 0; j < Group.
Len(); j++){
2344 JointTable->AddNJointRowsMP(*
this, Table, JointRowIDSet);
2355 TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
2358 #pragma omp parallel for schedule(dynamic)
2359 for (
int i = 0; i < Partitions.
Len(); i++){
2360 JointRowIDSet[i].
Reserve(PartitionSize);
2363 while (RowI < EndI) {
2367 for(
TInt j = 0; j < Group.
Len(); j++){
2378 JointTable->AddNJointRowsMP(*
this, Table, JointRowIDSet);
2387 TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
2390 #pragma omp parallel for schedule(dynamic)
2391 for (
int i = 0; i < Partitions.
Len(); i++){
2392 JointRowIDSet[i].
Reserve(PartitionSize);
2395 while (RowI < EndI) {
2399 for(
TInt j = 0; j < Group.
Len(); j++){
2410 JointTable->AddNJointRowsMP(*
this, Table, JointRowIDSet);
2415 #endif // GCC_ATOMIC
2421 TInt K = RowI.GetIntAttr(ColBId);
2424 for (
TInt i = 0; i < Group.
Len(); i++) {
2425 if (ThisIsSmaller) {
2426 JointTable->AddJointRow(*
this, Table, Group[i], RowI.GetRowIdx());
2428 JointTable->AddJointRow(*
this, Table, RowI.GetRowIdx(), Group[i]);
2439 TFlt K = RowI.GetFltAttr(ColBId);
2442 for (
TInt i = 0; i < Group.
Len(); i++) {
2443 if (ThisIsSmaller) {
2444 JointTable->AddJointRow(*
this, Table, Group[i], RowI.GetRowIdx());
2446 JointTable->AddJointRow(*
this, Table, RowI.GetRowIdx(), Group[i]);
2457 TInt K = RowI.GetStrMapById(ColBId);
2460 for (
TInt i = 0; i < Group.
Len(); i++) {
2461 if (ThisIsSmaller) {
2462 JointTable->AddJointRow(*
this, Table, Group[i], RowI.GetRowIdx());
2464 JointTable->AddJointRow(*
this, Table, RowI.GetRowIdx(), Group[i]);
2479 const TStr& KeyCol2,
const TStr& JoinCol2){
2481 printf(
"no such column %s\n", KeyCol1.
CStr());
2485 printf(
"no such column %s\n", KeyCol2.
CStr());
2489 printf(
"no such column %s\n", JoinCol1.
CStr());
2493 printf(
"no such column %s\n", JoinCol2.
CStr());
2497 printf(
"Trying to Join on columns of different type\n");
2501 printf(
"Key type mismatch\n");
2513 if(JoinColType ==
atStr){
2514 JVal = RowI.GetStrMapById(JoinColIdxB);
2516 JVal = RowI.GetIntAttr(JoinColIdxB);
2522 if(KeyType ==
atStr){
2523 KeyB = RowI.GetStrMapById(KeyColIdxB);
2525 KeyB = RowI.GetIntAttr(KeyColIdxB);
2529 for(
int i = 0; i < RelevantRows.
Len(); i++){
2532 if(KeyType ==
atStr){
2533 KeyS = TS.
StrColMaps[KeyColIdxS][RelevantRows[i]];
2535 KeyS = TS.
IntCols[KeyColIdxS][RelevantRows[i]];
2539 if(Counters.
IsKey(Keys)){
2547 Counters.
AddDat(Keys,
TIntTr(RelevantRows[i], RowI.GetRowIdx(),1));
2549 Counters.
AddDat(Keys,
TIntTr(RowI.GetRowIdx(), RelevantRows[i],1));
2563 if(JoinColType ==
atStr){
2564 JVal = RowI.GetStrMapById(JoinColIdxB);
2566 JVal = RowI.GetIntAttr(JoinColIdxB);
2572 if(KeyType ==
atStr){
2573 KeyB = RowI.GetStrMapById(KeyColIdxB);
2575 KeyB = RowI.GetIntAttr(KeyColIdxB);
2579 for(
int i = 0; i < RelevantRows.
Len(); i++){
2582 if(KeyType ==
atStr){
2583 KeyS = TS.
StrColMaps[KeyColIdxS][RelevantRows[i]];
2585 KeyS = TS.
IntCols[KeyColIdxS][RelevantRows[i]];
2590 if(Counters.
IsKey(K)){
2598 Counters.
AddDat(K,
TIntTr(RelevantRows[i], RowI.GetRowIdx(),1));
2600 Counters.
AddDat(K,
TIntTr(RowI.GetRowIdx(), RelevantRows[i],1));
2612 TIntTr& Counter = iter.GetDat();
2615 if(Counter.
Val3 >= Threshold){
2616 JointTable->AddJointRow(*
this, Table, Counter.
Val1, Counter.
Val2);
2625 const TIntTr& Counter = iter.GetDat();
2626 const TIntTr& Keys = iter.GetKey();
2628 if(Counter.
Val3 >= Threshold){
2630 if(!Pairs.
IsKey(K)){
2632 JointTable->AddJointRow(*
this, Table, Counter.
Val1, Counter.
Val2);
2655 const TTable& TS = ThisIsSmaller ? *
this : Table;
2656 const TTable& TB = ThisIsSmaller ? Table : *
this;
2657 TStr JoinColS = JoinCol1;
2662 JoinColS = JoinCol2;
2674 printf(
"ThresholdJoin only supports integer or string key attributes\n");
2675 TExcept::Throw(
"ThresholdJoin only supports integer or string key attributes");
2677 if(JoinColType !=
atInt && JoinColType !=
atStr){
2678 printf(
"ThresholdJoin only supports integer or string join attributes\n");
2679 TExcept::Throw(
"ThresholdJoin only supports integer or string join attributes");
2684 if(JoinColType ==
atInt){
2686 }
else if(JoinColType ==
atStr){
2689 TExcept::Throw(
"ThresholdJoin only supports integer or string join attributes");
2754 TInt NumRelevantCols = RelevantCols.
Len();
2757 for (
TInt i = 0; i < NumRelevantCols; i++) {
2759 ColIndices[i] =
GetColIdx(RelevantCols[i]);
2766 for (
TInt i = 0; i < NumRelevantCols; i++) {
2767 switch (ColTypes[i]) {
2779 if (!Predicate.
Eval()) {
2787 for (
TInt i = 0; i < NumRelevantCols; i++) {
2788 switch (ColTypes[i]) {
2790 Predicate.
SetIntVal(RelevantCols[i], RowI.GetIntAttr(RelevantCols[i]));
2793 Predicate.
SetFltVal(RelevantCols[i], RowI.GetFltAttr(RelevantCols[i]));
2796 Predicate.
SetStrVal(RelevantCols[i], RowI.GetStrAttr(RelevantCols[i]));
2800 if (Predicate.
Eval()) { SelectedRows.
Add(RowI.GetRowIdx()); }
2807 Select(Predicate, SelectedRows,
false);
2808 ClassifyAux(SelectedRows, LabelName, PositiveLabel, NegativeLabel);
2861 if (Result) { SelectedRows.
Add(RowI.GetRowIdx()); }
2867 const TStr& LabelName,
const TInt& PositiveLabel,
const TInt& NegativeLabel) {
2870 ClassifyAux(SelectedRows, LabelName, PositiveLabel, NegativeLabel);
2881 TExcept::Throw(
"SelectAtomicConst: coltype does not match const type");
2891 TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
2892 int RemoveCount = 0;
2899 #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD) reduction(+:RemoveCount)
2900 for (
int i = 0; i < Partitions.
Len(); i++){
2907 while (RowI < EndI) {
2910 if (Type !=
atStr) {
2920 if (First) { FirstRowIdx = CurrRowIdx; First =
false; }
2921 else {
Next[LastRowIdx] = CurrRowIdx; }
2922 LastRowIdx = CurrRowIdx;
2925 Bounds[i] =
TIntPr(FirstRowIdx, LastRowIdx);
2934 while (CurrBound < Bounds.Len() && Bounds[CurrBound].Val1 ==
TTable::Invalid) {
2937 if (CurrBound == Bounds.Len()) {
2947 TInt PrevBound = CurrBound;
2949 while (CurrBound < Bounds.Len()) {
2950 if (Bounds[CurrBound].Val1 == TTable::Invalid) { CurrBound++;
continue; }
2951 Next[Bounds[PrevBound].Val2] = Bounds[CurrBound].Val1;
2953 PrevBound = CurrBound;
2982 TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
2986 int TotalSelectedRows = 0;
2987 #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD) reduction(+:TotalSelectedRows)
2988 for (
int i = 0; i < Partitions.
Len(); i++){
2991 while (RowI < EndI) {
2992 if (Type !=
atStr) {
2994 TotalSelectedRows++;
2998 TotalSelectedRows++;
3007 SelectedTable->ResizeTable(TotalSelectedRows);
3011 if (TotalSelectedRows == 0) {
3016 #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD)
3017 for (
int i = 0; i < Partitions.
Len(); i++){
3018 TIntV LocalSelectedRows;
3019 LocalSelectedRows.
Reserve(PartitionSize);
3022 while (RowI < EndI) {
3023 if (Type !=
atStr) {
3034 SelectedTable->AddSelectedRows(*
this, LocalSelectedRows);
3044 SelectedTable->SetFirstValidRow();
3048 if (RowI.CompareAtomicConst(ColIdx, Val, Cmp)) {
3049 SelectedTable->AddRowI(RowI);
3057 if (RowI.CompareAtomicConst(ColIdx, Val, Cmp)) {
3058 SelectedRows.
Add(RowI.GetRowIdx());
3066 switch (CompareByType) {
3068 if (
IntCols[CompareByIndex][R1] >
IntCols[CompareByIndex][R2]) {
return (Asc ? 1 : -1); }
3069 if (
IntCols[CompareByIndex][R1] <
IntCols[CompareByIndex][R2]) {
return (Asc ? -1 : 1); }
3073 if (
FltCols[CompareByIndex][R1] >
FltCols[CompareByIndex][R2]) {
return (Asc ? 1 : -1); }
3074 if (
FltCols[CompareByIndex][R1] <
FltCols[CompareByIndex][R2]) {
return (Asc ? -1 : 1); }
3080 int CmpRes = strcmp(S1.
CStr(), S2.
CStr());
3081 return (Asc ? CmpRes : -CmpRes);
3089 for (
TInt i = 0; i < CompareByTypes.
Len(); i++) {
3090 TInt res =
CompareRows(R1, R2, CompareByTypes[i], CompareByIndices[i], Asc);
3091 if (res != 0) {
return res; }
3097 if (StartIdx < EndIdx) {
3098 for (
TInt i = StartIdx+1; i <= EndIdx; i++) {
3101 while ((StartIdx < j) && (
CompareRows(V[j-1], Val, SortByTypes, SortByIndices, Asc) > 0)) {
3111 TInt L = EndIdx - StartIdx + 1;
3115 if (
CompareRows(V[Idx1], V[Idx2], SortByTypes, SortByIndices, Asc) < 0) {
3116 if (
CompareRows(V[Idx2], V[Idx3], SortByTypes, SortByIndices, Asc) < 0) {
return Idx2; }
3117 if (
CompareRows(V[Idx1], V[Idx3], SortByTypes, SortByIndices, Asc) < 0) {
return Idx3; }
3120 if (
CompareRows(V[Idx3], V[Idx2], SortByTypes, SortByIndices, Asc) < 0) {
return Idx2; }
3121 if (
CompareRows(V[Idx3], V[Idx1], SortByTypes, SortByIndices, Asc) < 0) {
return Idx3; }
3130 for (j = StartIdx; j < EndIdx; j++) {
3131 if (
CompareRows(V[j], V[j+1], SortByTypes, SortByIndices, Asc) > 0) {
3139 TInt PivotIdx =
GetPivot(V, StartIdx, EndIdx, SortByTypes, SortByIndices, Asc);
3140 TInt Pivot = V[PivotIdx];
3141 V.
Swap(PivotIdx, EndIdx);
3142 TInt StoreIdx = StartIdx;
3143 for (
TInt i = StartIdx; i < EndIdx; i++) {
3144 if (
CompareRows(V[i], Pivot, SortByTypes, SortByIndices, Asc) <= 0) {
3145 V.
Swap(i, StoreIdx);
3150 V.
Swap(StoreIdx, EndIdx);
3155 if (StartIdx < EndIdx) {
3156 if (EndIdx - StartIdx < 20) {
3157 ISort(V, StartIdx, EndIdx, SortByTypes, SortByIndices, Asc);
3159 TInt Pivot =
Partition(V, StartIdx, EndIdx, SortByTypes, SortByIndices, Asc);
3160 if (Pivot > EndIdx) {
3169 V[Ub], V[Pivot], SortByTypes, SortByIndices, Asc) == 0) {
3172 QSort(V, StartIdx, Ub, SortByTypes, SortByIndices, Asc);
3173 QSort(V, Pivot+1, EndIdx, SortByTypes, SortByIndices, Asc);
3179 TInt i = Idx1, j = Idx2;
3181 while (i < Idx2 && j < Idx3) {
3182 if (
CompareRows(V[i], V[j], SortByTypes, SortByIndices, Asc) <= 0) {
3200 for (
TInt sz = 0; sz < Idx3 - Idx1; sz++) {
3201 V[Idx1 + sz] = SortedV[sz];
3207 TInt NumThreads = 8;
3210 for (
TInt i = 0; i < NumThreads; i++) {
3211 IndV.
Add(i * (Sz / NumThreads));
3215 omp_set_num_threads(NumThreads);
3216 #pragma omp parallel for
3217 for (
int i = 0; i < NumThreads; i++) {
3218 QSort(V, IndV[i], IndV[i+1] - 1, SortByTypes, SortByIndices, Asc);
3221 while (NumThreads > 1) {
3222 omp_set_num_threads(NumThreads / 2);
3223 #pragma omp parallel for
3224 for (
int i = 0; i < NumThreads; i += 2) {
3225 Merge(V, IndV[i], IndV[i+1], IndV[i+2], SortByTypes, SortByIndices, Asc);
3229 for (
TInt i = 0; i < NumThreads; i+=2) {
3235 NumThreads = NumThreads / 2;
3238 #endif // USE_OPENMP
3250 ValidRows[i] = RI.GetRowIdx();
3255 TIntV OrderByIndices(OrderBy.
Len());
3256 for (
TInt i = 0; i < OrderBy.
Len(); i++) {
3258 OrderByIndices[i] =
GetColIdx(OrderBy[i]);
3264 QSortPar(ValidRows, OrderByTypes, OrderByIndices, Asc);
3280 Next[ValidRows[i]] = ValidRows[i+1];
3282 if (NumValidRows > 0) {
3283 Next[ValidRows[NumValidRows-1]] =
Last;
3290 if (!OrderColName.
Empty()) {
3293 RankCol[ValidRows[i]] = i;
3295 if (ResetRankByMSC) {
3297 TStr GroupName = OrderBy[0];
3299 RankCol[ValidRows[i]] = 0;
3301 RankCol[ValidRows[i]] = RankCol[ValidRows[i-1]] + 1;
3318 if (
Next[i] != TTable::Invalid) {
3320 if (FreeIndex == 0) {
3326 Next[FreeIndex] = FreeIndex + 1;
3327 Mapping.
Add(FreeIndex);
3349 Mapping.
Add(TTable::Invalid);
3365 if (!(RowI <
EndRI())) {
3377 TInt CurrId = LastId;
3389 if (!NodeVals.
IsKey(NodeId)) {
3390 Graph->AddNode(NodeId);
3402 Graph->AddIntAttrDatE(RowId,
IntCols[Index][RowId], ColName);
3405 Graph->AddFltAttrDatE(RowId,
FltCols[Index][RowId], ColName);
3408 Graph->AddStrAttrDatE(RowId,
GetStrValIdx(Index, RowId), ColName);
3416 for (
TInt i = 0; i < NodeAttrV.
Len(); i++) {
3417 TStr ColAttr = NodeAttrV[i];
3428 if (!NodeIntAttrs.
IsKey(NId)) { NodeIntAttrs.
AddKey(NId); }
3431 }
else if (CT ==
atFlt) {
3432 if (!NodeFltAttrs.
IsKey(NId)) { NodeFltAttrs.
AddKey(NId); }
3436 if (!NodeStrAttrs.
IsKey(NId)) { NodeStrAttrs.
AddKey(NId); }
3464 TInt CurrRowIdx = *it;
3468 if (NodeType ==
atFlt) {
3473 }
else if (NodeType ==
atInt || NodeType ==
atStr) {
3474 if (NodeType ==
atInt) {
3475 SVal =
IntCols[SrcColIdx][CurrRowIdx];
3476 DVal =
IntCols[DstColIdx][CurrRowIdx];
3483 if (!Graph->IsNode(SVal)) { Graph->AddNode(SVal); }
3484 if (!Graph->IsNode(DVal)) { Graph->AddNode(DVal); }
3490 Graph->AddEdge(SVal, DVal, CurrRowIdx);
3504 for (
TNEANet::TNodeI NodeI = Graph->BegNI(); NodeI < Graph->EndNI(); NodeI++) {
3505 TInt NId = NodeI.GetId();
3506 if (NodeIntAttrs.
IsKey(NId)) {
3510 Graph->AddIntAttrDatN(NId, AttrVal, it.GetKey());
3513 if (NodeFltAttrs.
IsKey(NId)) {
3517 Graph->AddFltAttrDatN(NId, AttrVal, it.GetKey());
3520 if (NodeStrAttrs.
IsKey(NId)) {
3524 Graph->AddStrAttrDatN(NId, AttrVal, it.GetKey());
3542 for (
TInt i = 0; i < NumBuckets; i++) {
3548 Assert (JumpSize <= WindowSize);
3549 int NumBuckets, MinBucket, MaxBucket;
3558 if (MinValue >
IntCols[SplitColId][i]) {
3559 MinValue =
IntCols[SplitColId][i];
3561 if (MaxValue <
IntCols[SplitColId][i]) {
3562 MaxValue =
IntCols[SplitColId][i];
3567 if (StartVal ==
TInt::Mn) StartVal = MinValue;
3568 if (EndVal ==
TInt::Mx) EndVal = MaxValue;
3574 NumBuckets = (EndVal - StartVal)/JumpSize + 1;
3582 int SplitVal =
IntCols[SplitColId][i];
3583 if (SplitVal < StartVal || SplitVal > EndVal) {
continue; }
3584 int RowVal = SplitVal - StartVal;
3585 if (JumpSize == 0) {
3586 MinBucket = RowVal/WindowSize;
3587 MaxBucket = NumBuckets-1;
3588 }
else if (JumpSize == WindowSize) {
3589 MinBucket = MaxBucket = RowVal/JumpSize;
3591 if (RowVal < WindowSize) { MinBucket = 0; }
3592 else { MinBucket = (RowVal-WindowSize)/JumpSize + 1; }
3593 MaxBucket = RowVal/JumpSize;
3601 int NumBuckets = SplitIntervals.
Len();
3607 int SplitVal =
IntCols[SplitColId][i];
3608 for (
TInt j = 0; j < SplitIntervals.
Len(); j++) {
3609 if (SplitVal >= SplitIntervals[j].Val1 && SplitVal < SplitIntervals[j].Val2) {
3622 GraphSequence.
Add(PNet);
3625 return GraphSequence;
3653 printf(
"buckets filled\n");
3701 for (
TInt i = 0; i < IntAttrNames.
Len(); i++) {
3704 for (
TInt i = 0; i < FltAttrNames.
Len(); i++) {
3707 for (
TInt i = 0; i < StrAttrNames.
Len(); i++) {
3715 while (NodeI < Network->EndNI()) {
3716 T->IntCols[0].Add(NodeI.
GetId());
3717 for (
TInt i = 0; i < IntAttrNames.
Len(); i++) {
3718 T->IntCols[i+1].Add(Network->GetIntAttrDatN(NodeI,IntAttrNames[i]));
3720 for (
TInt i = 0; i < FltAttrNames.
Len(); i++) {
3721 T->FltCols[i].Add(Network->GetFltAttrDatN(NodeI,FltAttrNames[i]));
3723 for (
TInt i = 0; i < StrAttrNames.
Len(); i++) {
3724 T->AddStrVal(i, Network->GetStrAttrDatN(NodeI,StrAttrNames[i]));
3731 T->NumValidRows = T->NumRows;
3732 T->Next =
TIntV(T->NumRows,0);
3733 for (
TInt i = 0; i < T->NumRows-1; i++) {
3736 T->LastValidRow = T->NumRows-1;
3755 for (
TInt i = 0; i < IntAttrNames.
Len(); i++) {
3758 for (
TInt i = 0; i < FltAttrNames.
Len(); i++) {
3761 for (
TInt i = 0; i < StrAttrNames.
Len(); i++) {
3770 while (EdgeI < Network->EndEI()) {
3771 T->IntCols[0].Add(EdgeI.
GetId());
3774 for (
TInt i = 0; i < IntAttrNames.
Len(); i++) {
3775 T->IntCols[i+3].Add(Network->GetIntAttrDatE(EdgeI,IntAttrNames[i]));
3777 for (
TInt i = 0; i < FltAttrNames.
Len(); i++) {
3778 T->FltCols[i].Add(Network->GetFltAttrDatE(EdgeI,FltAttrNames[i]));
3780 for (
TInt i = 0; i < StrAttrNames.
Len(); i++) {
3781 T->AddStrVal(i, Network->GetStrAttrDatE(EdgeI,StrAttrNames[i]));
3788 T->NumValidRows = T->NumRows;
3789 T->Next =
TIntV(T->NumRows,0);
3790 for (
TInt i = 0; i < T->NumRows-1; i++) {
3793 T->LastValidRow = T->NumRows-1;
3806 TInt NumEdges = Network->GetEdges();
3807 TInt NumPartitions = omp_get_max_threads()*CHUNKS_PER_THREAD;
3808 TInt PartitionSize = NumEdges/NumPartitions;
3809 if (PartitionSize*NumPartitions < NumEdges) { NumPartitions++;}
3813 TIntV PartitionSizes;
3816 while (FirstEI < Network->EndEI()){
3817 if (currCount == PartitionSize) {
3818 Partitions.
Add(TEIPr(currStart, FirstEI));
3819 currStart = FirstEI;
3820 PartitionSizes.
Add(currCount);
3828 Partitions.
Add(TEIPr(currStart, FirstEI));
3829 PartitionSizes.
Add(currCount);
3831 T->ResizeTable(NumEdges);
3832 #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD)
3833 for (
int p = 0; p < Partitions.
Len(); p++) {
3837 int start = T->GetEmptyRowsStart(PartitionSizes[p]);
3838 while (EdgeI < EndI) {
3839 T->IntCols[0][start] = EdgeI.
GetSrcNId();
3840 T->IntCols[1][start] = EdgeI.
GetDstNId();
3842 if (EdgeI < EndI) { T->Next[start] = start+1;}
3847 Assert(T->NumRows == NumEdges);
3850 #endif // GCC_ATOMIC
3853 const TStr& NodeAttrName,
const TAttrType& NodeAttrType,
const TStr& PropertyAttrName,
3860 TInt NodeColIdx = T->GetColIdx(NodeAttrName);
3863 for (
TNEANet::TNodeI NodeI = Network->BegNI(); NodeI < Network->EndNI(); NodeI++) {
3864 switch (NodeAttrType) {
3866 T->IntCols[NodeColIdx].Add(Network->GetIntAttrDatN(NodeI,NodeAttrName));
3869 T->FltCols[NodeColIdx].Add(Network->GetFltAttrDatN(NodeI,NodeAttrName));
3872 T->AddStrVal(
TInt(0), Network->GetStrAttrDatN(NodeI,NodeAttrName));
3875 T->FltCols[0].Add(Property.
GetDat(NodeI.GetId()));
3880 T->NumValidRows = T->NumRows;
3881 T->Next =
TIntV(T->NumRows,0);
3882 for (
TInt i = 0; i < T->NumRows-1; i++) {
3885 T->LastValidRow = T->NumRows-1;
3893 if (GroupBy.
Empty()) {
3894 OrderBy.
Add(OrderCol);
3896 OrderBy.
Add(GroupBy);
3897 OrderBy.
Add(OrderCol);
3899 if (RankColName.
Empty()) {
3902 Order(OrderBy, RankColName,
true);
3907 TInt Succ = RI.GetRowIdx();
3908 TBool OutOfGroup =
false;
3909 for (
TInt i = 0; i < K; i++) {
3911 if (Succ ==
Last) {
break; }
3912 switch (GroupByAttrType) {
3914 if (
GetIntVal(GroupBy, Succ) != RI.GetIntAttr(GroupBy)) { OutOfGroup =
true; }
3917 if (
GetFltVal(GroupBy, Succ) != RI.GetFltAttr(GroupBy)) { OutOfGroup =
true; }
3920 if (
GetStrVal(GroupBy, Succ) != RI.GetStrAttr(GroupBy)) { OutOfGroup =
true; }
3923 if (OutOfGroup) {
break; }
3924 T->AddJointRow(*
this, *
this, RI.GetRowIdx(), Succ);
3931 printf(
"Total number of rows: %d\n",
NumRows.
Val);
3933 printf(
"Number of Int columns: %d\n",
IntCols.
Len());
3934 printf(
"Number of Flt columns: %d\n",
FltCols.
Len());
3937 printf(
"Approximate table size is %s KB\n",
TUInt64::GetStr(MemUsed).CStr());
3941 TSize ApproxSize = 0;
3960 printf(
"Number of strings in pool: ");
3962 printf(
"Number of entries in hash table: ");
3965 printf(
"Approximate context size is %s KB\n",
3970 TSize ApproxSize = 0;
3983 if (TColIdx < 0) {
TExcept::Throw(
"when adding a table, it must contain all columns of source table!"); }
3998 for (
TInt i = 0; i < TNext.
Len(); i++) {
4020 TIntV IntGroupByCols;
4021 TIntV FltGroupByCols;
4022 TIntV StrGroupByCols;
4024 TInt IKLen, FKLen, SKLen;
4034 if (
Sch[c] != Table.
Sch[c]) {
4035 printf(
"(%s,%d) != (%s,%d)\n",
Sch[c].Val1.CStr(),
Sch[c].Val2, Table.
Sch[c].Val1.CStr(), Table.
Sch[c].Val2);
4040 switch (ColType.
Val1) {
4042 IntGroupByCols.
Add(ColType.
Val2);
4045 FltGroupByCols.
Add(ColType.
Val2);
4048 StrGroupByCols.
Add(ColType.
Val2);
4053 IKLen = IntGroupByCols.
Len();
4054 FKLen = FltGroupByCols.
Len();
4055 SKLen = StrGroupByCols.
Len();
4058 GroupAux(GroupBy, Grouping,
true,
"",
false, UniqueVec,
true);
4063 TIntV IKey(IKLen + SKLen, 0);
4064 TFltV FKey(FKLen, 0);
4067 for (
TInt c = 0; c < IKLen; c++) {
4068 IKey.
Add(it.GetIntAttr(IntGroupByCols[c]));
4070 for (
TInt c = 0; c < FKLen; c++) {
4071 FKey.
Add(it.GetFltAttr(FltGroupByCols[c]));
4073 for (
TInt c = 0; c < SKLen; c++) {
4074 IKey.
Add(it.GetStrMapById(StrGroupByCols[c]));
4079 TInt RowIdx = it.GetRowIdx();
4080 if (Grouping.
IsKey(GroupKey)) {
4082 Collisions.
AddKey(RowIdx);
4089 printf(
"new column dimension must agree with number of rows\n");
4097 IntCols[ColIdx][RI.GetRowIdx()] = ColVals[i];
4106 printf(
"new column dimension must agree with number of rows\n");
4114 FltCols[ColIdx][RI.GetRowIdx()] = ColVals[i];
4123 printf(
"new column dimension must agree with number of rows\n");
4156 TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
4157 #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD)
4158 for (
int i = 0; i < Partitions.
Len(); i++){
4171 return(__sync_bool_compare_and_swap(lock, 0, 1));
4176 TFlt DefaultFltVal) {
4200 TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
4210 #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD) // num_threads(1)
4211 for (
int i = 0; i < Partitions.
Len(); i++) {
4214 while (RowI < EndI) {
4216 if (Grouping.
IsKey(K)) {
4218 for (
int j = 0; j < UpdateRows.
Len(); j++) {
4219 int* lock = &Locks[UpdateRows[j]].Val;
4240 #endif // GCC_ATOMIC
4243 const TStr& FKeyAttr,
const TStr& ReadAttr,
TFlt DefaultFltVal){
4254 #endif // GCC_ATOMIC
4269 FltCols[UpdateColIdx][iter.GetRowIdx()] = DefaultFltVal;
4278 TInt K = RI.GetIntAttr(NFKeyAttr);
4279 if (Grouping.
IsKey(K)) {
4281 for (
int i = 0; i < UpdateRows.
Len(); i++) {
4282 FltCols[UpdateColIdx][UpdateRows[i]] = RI.GetFltAttr(NReadAttr);
4318 for (
TInt c = 0; c < IntVals.
Len(); c++) {
4321 for (
TInt c = 0; c < FltVals.
Len(); c++) {
4324 for (
TInt c = 0; c < StrVals.
Len(); c++) {
4331 if (RowCount == 0) {
4342 #pragma omp parallel for schedule(static)
4344 for (
int i = 0; i < TotalCols+1; i++) {
4345 if (i < FltOffset) {
4347 }
else if (i < StrOffset) {
4349 }
else if (i < TotalCols) {
4355 }
else if (
Next.
Len() > RowCount) {
4360 #pragma omp parallel for schedule(static)
4362 for (
int i = 0; i < TotalCols+1; i++) {
4363 if (i < FltOffset) {
4365 }
else if (i < StrOffset) {
4367 }
else if (i < TotalCols) {
4379 #pragma omp critical
4400 int NewRows = RowIDs.
Len();
4401 if (NewRows == 0) {
return; }
4404 for (
TInt r = 0; r < NewRows; r++) {
4405 TInt CurrRowIdx = RowIDs[r];
4416 for (
TInt r = 0; r < NewRows-1; r++) {
4417 Next[start+r] = start+r+1;
4422 if (NewRows == 0) {
return; }
4425 for (
TInt r = 0; r < NewRows; r++) {
4426 for (
TInt i = 0; i < IntColsP.
Len(); i++) {
4427 IntCols[i][start+r] = IntColsP[i][r];
4429 for (
TInt i = 0; i < FltColsP.
Len(); i++) {
4430 FltCols[i][start+r] = FltColsP[i][r];
4432 for (
TInt i = 0; i < StrColMapsP.
Len(); i++) {
4436 for (
TInt r = 0; r < NewRows-1; r++) {
4437 Next[start+r] = start+r+1;
4444 int JointTableSize = 0;
4445 TIntV StartOffsets(JointRowIDSet.
Len());
4446 for (
int i = 0; i < JointRowIDSet.
Len(); i++) {
4447 StartOffsets[i] = JointTableSize;
4448 JointTableSize += JointRowIDSet[i].
Len();
4450 if (JointTableSize == 0) {
4468 for (
TInt IdCnt = 0; IdCnt < JointTableSize; IdCnt++) {
4472 #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD)
4473 for (
int j = 0; j < JointRowIDSet.
Len(); j++) {
4474 const TIntPrV& RowIDs = JointRowIDSet[j];
4475 int start = StartOffsets[j];
4476 int NewRows = RowIDs.
Len();
4477 if (NewRows == 0) {
continue;}
4478 for (
TInt r = 0; r < NewRows; r++){
4479 TIntPr CurrRowIdPr = RowIDs[r];
4498 IntCols[IdOffset][start+r] = start+r;
4500 for(
TInt r = 0; r < NewRows; r++){
4501 Next[start+r] = start+r+1;
4509 #endif // USE_OPENMP
4519 result->AddTable(*
this);
4520 result->UnionAllInPlace(Table);
4539 ColNames.
Add(
Sch[c].Val1);
4546 result->AddTable(*
this);
4548 result->Unique(ColNames);
4552 if (!Collisions.
IsKey(it.GetRowIdx())) {
4553 result->AddRowI(it);
4582 if (Collisions.
IsKey(it.GetRowIdx())) {
4583 result->AddRowI(it);
4607 if (!Collisions.
IsKey(it.GetRowIdx())) {
4608 result->AddRowI(it);
4617 for (
TInt c = 0; c < ProjectCols.
Len(); c++) {
4623 result->AddTable(*
this);
4633 TStr NColName = ColName;
4634 if (NColName.
GetCh(NColName.
Len()-2) ==
'-') {
4639 if (NColName ==
Sch[i].Val1.GetSubStr(0,
Sch[i].Val1.
Len()-3)) {
4644 NColName = NColName +
"-" + Conflicts.
GetStr();
4649 TStr DColName = ColName;
4650 if (DColName.
Len() == 0) {
return DColName; }
4651 if (DColName.
GetCh(0) ==
'_') {
return DColName; }
4652 if (DColName.
GetCh(DColName.
Len()-2) ==
'-') {
4657 if (DColName ==
Sch[i].Val1.GetSubStr(0,
Sch[i].Val1.
Len()-3)) {
4661 if (Conflicts > 1) {
return ColName; }
4662 else {
return DColName; }
4700 IntCols[LabelColIdx][i] = NegativeLabel;
4702 for (
TInt i = 0; i < SelectedRows.
Len(); i++) {
4703 IntCols[LabelColIdx][SelectedRows[i]] = PositiveLabel;
4713 TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
4714 #pragma omp parallel for schedule(dynamic, CHUNKS_PER_THREAD)
4715 for (
int i = 0; i < Partitions.
Len(); i++){
4719 if(ResType ==
atInt){
4744 #endif // USE_OPENMP
4760 if (Arg1Type ==
atStr || Arg2Type ==
atStr) {
4761 TExcept::Throw(
"Only numeric columns supported in arithmetic operations.");
4763 if(Arg1Type ==
atInt && Arg2Type ==
atFlt && ResAttr ==
""){
4764 TExcept::Throw(
"Trying to write float values to an existing int-typed column");
4771 TInt ColIdx3 = ColIdx1;
4773 if (ResAttr !=
"") {
4774 if (Arg1Type ==
atInt && Arg2Type ==
atInt) {
4784 ColGenericOpMP(ColIdx1, ColIdx2, Arg1Type, Arg2Type, ColIdx3, op);
4789 if(Arg1Type ==
atInt && Arg2Type ==
atInt){ printf(
"hooray!\n"); ResType =
atInt;}
4792 if(ResType ==
atInt){
4793 TInt V1 = RowI.GetIntAttr(ColIdx1);
4794 TInt V2 = RowI.GetIntAttr(ColIdx2);
4795 if (op ==
aoAdd) {
IntCols[ColIdx3][RowI.GetRowIdx()] = V1 + V2; }
4796 if (op ==
aoSub) {
IntCols[ColIdx3][RowI.GetRowIdx()] = V1 - V2; }
4797 if (op ==
aoMul) {
IntCols[ColIdx3][RowI.GetRowIdx()] = V1 * V2; }
4798 if (op ==
aoDiv) {
IntCols[ColIdx3][RowI.GetRowIdx()] = V1 / V2; }
4799 if (op ==
aoMod) {
IntCols[ColIdx3][RowI.GetRowIdx()] = V1 % V2; }
4800 if (op ==
aoMin) {
IntCols[ColIdx3][RowI.GetRowIdx()] = (V1 < V2) ? V1 : V2;}
4801 if (op ==
aoMax) {
IntCols[ColIdx3][RowI.GetRowIdx()] = (V1 > V2) ? V1 : V2;}
4803 TFlt V1 = (Arg1Type ==
atInt) ? (
TFlt)RowI.GetIntAttr(ColIdx1) : RowI.GetFltAttr(ColIdx1);
4804 TFlt V2 = (Arg2Type ==
atInt) ? (
TFlt)RowI.GetIntAttr(ColIdx2) : RowI.GetFltAttr(ColIdx2);
4805 if (op ==
aoAdd) {
FltCols[ColIdx3][RowI.GetRowIdx()] = V1 + V2; }
4806 if (op ==
aoSub) {
FltCols[ColIdx3][RowI.GetRowIdx()] = V1 - V2; }
4807 if (op ==
aoMul) {
FltCols[ColIdx3][RowI.GetRowIdx()] = V1 * V2; }
4808 if (op ==
aoDiv) {
FltCols[ColIdx3][RowI.GetRowIdx()] = V1 / V2; }
4810 if (op ==
aoMin) {
FltCols[ColIdx3][RowI.GetRowIdx()] = (V1 < V2) ? V1 : V2;}
4811 if (op ==
aoMax) {
FltCols[ColIdx3][RowI.GetRowIdx()] = (V1 > V2) ? V1 : V2;}
4859 TExcept::Throw(
"Only numeric columns supported in arithmetic operations.");
4861 if(Arg1Type ==
atInt && Arg2Type ==
atFlt && ResAttr ==
""){
4862 TExcept::Throw(
"Trying to write float values to an existing int-typed column");
4869 TInt ColIdx3 = AddToFirstTable ? ColIdx1 : ColIdx2;
4872 if (ResAttr !=
"") {
4873 if (AddToFirstTable) {
4874 if (Arg1Type ==
atInt && Arg2Type ==
atInt) {
4882 if (Arg1Type ==
atInt && Arg2Type ==
atInt) {